/*
 * Decompiled with CFR 0.152.
 */
package com.gettyio.core.channel;

import com.gettyio.core.buffer.ChunkPool;
import com.gettyio.core.channel.ChannelState;
import com.gettyio.core.channel.SocketChannel;
import com.gettyio.core.channel.config.BaseConfig;
import com.gettyio.core.pipeline.ChannelPipeline;
import com.gettyio.core.util.LinkedBlockQueue;
import com.gettyio.core.util.ThreadPool;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;

public class UdpChannel
extends SocketChannel {
    private DatagramChannel datagramChannel;
    private Selector selector;
    private LinkedBlockQueue<Object> queue;
    private ThreadPool workerThreadPool;

    public UdpChannel(DatagramChannel datagramChannel, Selector selector, BaseConfig config, ChunkPool chunkPool, ChannelPipeline channelPipeline, int workerThreadNum) {
        this.datagramChannel = datagramChannel;
        this.selector = selector;
        this.config = config;
        this.chunkPool = chunkPool;
        this.workerThreadPool = new ThreadPool(0, workerThreadNum);
        this.queue = new LinkedBlockQueue(config.getBufferWriterQueueSize());
        try {
            channelPipeline.initChannel(this);
        }
        catch (Exception e) {
            throw new RuntimeException("channelPipeline init exception", e);
        }
        this.loopWrite();
        try {
            this.invokePipeline(ChannelState.NEW_CHANNEL);
        }
        catch (Exception e) {
            logger.error(e);
        }
    }

    @Override
    public void starRead() {
        this.initiateClose = false;
        this.workerThreadPool.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    while (UdpChannel.this.selector.select() > 0) {
                        Iterator<SelectionKey> it = UdpChannel.this.selector.selectedKeys().iterator();
                        while (it.hasNext()) {
                            SelectionKey sk = it.next();
                            if (!sk.isReadable()) continue;
                            ByteBuffer readBuffer = UdpChannel.this.chunkPool.allocate(UdpChannel.this.config.getReadBufferSize(), UdpChannel.this.config.getChunkPoolBlockTime());
                            InetSocketAddress address = (InetSocketAddress)UdpChannel.this.datagramChannel.receive(readBuffer);
                            if (null == readBuffer) continue;
                            readBuffer.flip();
                            while (readBuffer.hasRemaining()) {
                                byte[] bytes = new byte[readBuffer.remaining()];
                                readBuffer.get(bytes, 0, bytes.length);
                                DatagramPacket datagramPacket = new DatagramPacket(bytes, bytes.length, address);
                                UdpChannel.this.readToPipeline(datagramPacket);
                            }
                            UdpChannel.this.chunkPool.deallocate(readBuffer);
                        }
                        it.remove();
                    }
                }
                catch (IOException e) {
                    SocketChannel.logger.error(e);
                }
                catch (InterruptedException e) {
                    SocketChannel.logger.error(e);
                }
                catch (TimeoutException e) {
                    SocketChannel.logger.error(e);
                }
                catch (Exception e) {
                    SocketChannel.logger.error(e);
                }
            }
        });
    }

    private void loopWrite() {
        this.workerThreadPool.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    Object obj;
                    while ((obj = UdpChannel.this.queue.poll()) != null) {
                        UdpChannel.this.send(obj);
                    }
                }
                catch (InterruptedException e) {
                    SocketChannel.logger.error(e.getMessage(), e);
                }
            }
        });
    }

    @Override
    public void close() {
        if (this.status == 1) {
            logger.warn("Channel:{} is closed:", (Object)this.getChannelId());
            return;
        }
        try {
            this.datagramChannel.close();
        }
        catch (IOException e) {
            logger.error(e);
        }
        if (this.channelFutureListener != null) {
            this.channelFutureListener.operationComplete(this);
        }
        this.status = 1;
        if (this.defaultChannelPipeline != null) {
            this.defaultChannelPipeline.clean();
            this.defaultChannelPipeline = null;
        }
    }

    @Override
    public synchronized void close(boolean initiateClose) {
        this.initiateClose = initiateClose;
        this.close();
    }

    @Override
    public void writeAndFlush(Object obj) {
        try {
            this.queue.put(obj);
        }
        catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

    @Override
    @Deprecated
    public void writeToChannel(Object obj) {
        try {
            this.queue.put(obj);
        }
        catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

    @Override
    public InetSocketAddress getLocalAddress() throws IOException {
        this.assertChannel();
        return (InetSocketAddress)this.datagramChannel.getLocalAddress();
    }

    private void assertChannel() throws IOException {
        if (this.status == 1 || this.datagramChannel == null) {
            throw new IOException("channel is closed");
        }
    }

    private void send(Object obj) {
        try {
            DatagramPacket datagramPacket = (DatagramPacket)obj;
            ByteBuffer byteBuffer = this.chunkPool.allocate(datagramPacket.getLength(), this.config.getChunkPoolBlockTime());
            byteBuffer.put(datagramPacket.getData());
            byteBuffer.flip();
            this.datagramChannel.send(byteBuffer, datagramPacket.getSocketAddress());
            this.chunkPool.deallocate(byteBuffer);
        }
        catch (ClassCastException e) {
            logger.error(e.getMessage(), e);
        }
        catch (InterruptedException e) {
            logger.error(e);
        }
        catch (IOException e) {
            logger.error(e);
        }
        catch (TimeoutException e) {
            logger.error(e);
        }
    }
}

