/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.cert.Certificate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.NettyServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.server.ZooKeeperSaslServer;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.command.CommandExecutor;
import org.apache.zookeeper.server.command.FourLetterCommands;
import org.apache.zookeeper.server.command.SetTraceMaskCommand;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerCnxn
extends ServerCnxn {
    private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class);
    Channel channel;
    ChannelBuffer queuedBuffer;
    volatile boolean throttled;
    ByteBuffer bb;
    ByteBuffer bbLen = ByteBuffer.allocate(4);
    long sessionId;
    int sessionTimeout;
    AtomicLong outstandingCount = new AtomicLong();
    Certificate[] clientChain;
    private volatile ZooKeeperServer zkServer;
    NettyServerCnxnFactory factory;
    boolean initialized;
    private static final byte[] fourBytes = new byte[4];

    NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
        this.channel = channel;
        this.zkServer = zks;
        this.factory = factory;
        if (this.factory.login != null) {
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("close called for sessionid:0x" + Long.toHexString(this.sessionId));
        }
        Set set = this.factory.cnxns;
        synchronized (set) {
            if (!this.factory.cnxns.remove(this)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("cnxns size:" + this.factory.cnxns.size());
                }
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("close in progress for sessionid:0x" + Long.toHexString(this.sessionId));
            }
            HashMap<InetAddress, Set<NettyServerCnxn>> hashMap = this.factory.ipMap;
            synchronized (hashMap) {
                Set<NettyServerCnxn> s = this.factory.ipMap.get(((InetSocketAddress)this.channel.getRemoteAddress()).getAddress());
                s.remove(this);
            }
        }
        if (this.channel.isOpen()) {
            this.channel.close();
        }
        this.factory.unregisterConnection(this);
    }

    @Override
    public long getSessionId() {
        return this.sessionId;
    }

    @Override
    public int getSessionTimeout() {
        return this.sessionTimeout;
    }

    @Override
    public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
        }
        WatcherEvent e = event.getWrapper();
        try {
            this.sendResponse(h, e, "notification");
        }
        catch (IOException e1) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Problem sending to " + this.getRemoteSocketAddress(), (Throwable)e1);
            }
            this.close();
        }
    }

    @Override
    public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
        if (!this.channel.isOpen()) {
            return;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        try {
            baos.write(fourBytes);
            bos.writeRecord(h, "header");
            if (r != null) {
                bos.writeRecord(r, tag);
            }
            baos.close();
        }
        catch (IOException e) {
            LOG.error("Error serializing response");
        }
        byte[] b = baos.toByteArray();
        ByteBuffer bb = ByteBuffer.wrap(b);
        bb.putInt(b.length - 4).rewind();
        this.sendBuffer(bb);
        if (h.getXid() > 0 && !this.zkServer.shouldThrottle(this.outstandingCount.decrementAndGet())) {
            this.enableRecv();
        }
    }

    @Override
    public void setSessionId(long sessionId) {
        this.sessionId = sessionId;
    }

    @Override
    public void enableRecv() {
        if (this.throttled) {
            this.throttled = false;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending unthrottle event " + this);
            }
            this.channel.getPipeline().sendUpstream((ChannelEvent)new ResumeMessageEvent(this.channel));
        }
    }

    @Override
    public void sendBuffer(ByteBuffer sendBuffer) {
        if (sendBuffer == ServerCnxnFactory.closeConn) {
            this.close();
            return;
        }
        this.channel.write((Object)ChannelBuffers.wrappedBuffer((ByteBuffer)sendBuffer));
        this.packetSent();
    }

    private boolean checkFourLetterWord(Channel channel, ChannelBuffer message, int len) throws IOException {
        String cmd = FourLetterCommands.cmd2String.get(len);
        if (cmd == null) {
            return false;
        }
        channel.setInterestOps(0).awaitUninterruptibly();
        LOG.info("Processing " + cmd + " command from " + channel.getRemoteAddress());
        this.packetReceived();
        PrintWriter pwriter = new PrintWriter(new BufferedWriter(new SendBufferWriter()));
        if (len == FourLetterCommands.setTraceMaskCmd) {
            ByteBuffer mask = ByteBuffer.allocate(8);
            message.readBytes(mask);
            mask.flip();
            long traceMask = mask.getLong();
            ZooTrace.setTextTraceLevel(traceMask);
            SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, this, traceMask);
            setMask.start();
            return true;
        }
        CommandExecutor commandExecutor = new CommandExecutor();
        return commandExecutor.execute(this, pwriter, len, this.zkServer, this.factory);
    }

    public void receiveMessage(ChannelBuffer message) {
        try {
            while (message.readable() && !this.throttled) {
                if (this.bb != null) {
                    ByteBuffer dat;
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable " + message.readableBytes() + " bb len " + this.bb.remaining() + " " + this.bb);
                        dat = this.bb.duplicate();
                        dat.flip();
                        LOG.trace(Long.toHexString(this.sessionId) + " bb 0x" + ChannelBuffers.hexDump((ChannelBuffer)ChannelBuffers.copiedBuffer((ByteBuffer)dat)));
                    }
                    if (this.bb.remaining() > message.readableBytes()) {
                        int newLimit = this.bb.position() + message.readableBytes();
                        this.bb.limit(newLimit);
                    }
                    message.readBytes(this.bb);
                    this.bb.limit(this.bb.capacity());
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("after readBytes message readable " + message.readableBytes() + " bb len " + this.bb.remaining() + " " + this.bb);
                        dat = this.bb.duplicate();
                        dat.flip();
                        LOG.trace("after readbytes " + Long.toHexString(this.sessionId) + " bb 0x" + ChannelBuffers.hexDump((ChannelBuffer)ChannelBuffers.copiedBuffer((ByteBuffer)dat)));
                    }
                    if (this.bb.remaining() != 0) continue;
                    this.packetReceived();
                    this.bb.flip();
                    ZooKeeperServer zks = this.zkServer;
                    if (zks == null) {
                        throw new IOException("ZK down");
                    }
                    if (this.initialized) {
                        zks.processPacket(this, this.bb);
                        if (zks.shouldThrottle(this.outstandingCount.incrementAndGet())) {
                            this.disableRecvNoWait();
                        }
                    } else {
                        LOG.debug("got conn req request from " + this.getRemoteSocketAddress());
                        zks.processConnectRequest(this, this.bb);
                        this.initialized = true;
                    }
                    this.bb = null;
                    continue;
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("message readable " + message.readableBytes() + " bblenrem " + this.bbLen.remaining());
                    ByteBuffer dat = this.bbLen.duplicate();
                    dat.flip();
                    LOG.trace(Long.toHexString(this.sessionId) + " bbLen 0x" + ChannelBuffers.hexDump((ChannelBuffer)ChannelBuffers.copiedBuffer((ByteBuffer)dat)));
                }
                if (message.readableBytes() < this.bbLen.remaining()) {
                    this.bbLen.limit(this.bbLen.position() + message.readableBytes());
                }
                message.readBytes(this.bbLen);
                this.bbLen.limit(this.bbLen.capacity());
                if (this.bbLen.remaining() != 0) continue;
                this.bbLen.flip();
                if (LOG.isTraceEnabled()) {
                    LOG.trace(Long.toHexString(this.sessionId) + " bbLen 0x" + ChannelBuffers.hexDump((ChannelBuffer)ChannelBuffers.copiedBuffer((ByteBuffer)this.bbLen)));
                }
                int len = this.bbLen.getInt();
                if (LOG.isTraceEnabled()) {
                    LOG.trace(Long.toHexString(this.sessionId) + " bbLen len is " + len);
                }
                this.bbLen.clear();
                if (!this.initialized && this.checkFourLetterWord(this.channel, message, len)) {
                    return;
                }
                if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                    throw new IOException("Len error " + len);
                }
                this.bb = ByteBuffer.allocate(len);
            }
        }
        catch (IOException e) {
            LOG.warn("Closing connection to " + this.getRemoteSocketAddress(), (Throwable)e);
            this.close();
        }
    }

    @Override
    public void disableRecv() {
        this.disableRecvNoWait().awaitUninterruptibly();
    }

    private ChannelFuture disableRecvNoWait() {
        this.throttled = true;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Throttling - disabling recv " + this);
        }
        return this.channel.setReadable(false);
    }

    @Override
    public long getOutstandingRequests() {
        return this.outstandingCount.longValue();
    }

    @Override
    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public int getInterestOps() {
        return this.channel.getInterestOps();
    }

    @Override
    public InetSocketAddress getRemoteSocketAddress() {
        return (InetSocketAddress)this.channel.getRemoteAddress();
    }

    @Override
    public void sendCloseSession() {
        this.sendBuffer(ServerCnxnFactory.closeConn);
    }

    @Override
    protected ServerStats serverStats() {
        if (this.zkServer == null) {
            return null;
        }
        return this.zkServer.serverStats();
    }

    @Override
    public boolean isSecure() {
        return this.factory.secure;
    }

    @Override
    public Certificate[] getClientCertificateChain() {
        if (this.clientChain == null) {
            return null;
        }
        return Arrays.copyOf(this.clientChain, this.clientChain.length);
    }

    @Override
    public void setClientCertificateChain(Certificate[] chain) {
        this.clientChain = chain == null ? null : Arrays.copyOf(chain, chain.length);
    }

    private class SendBufferWriter
    extends Writer {
        private StringBuffer sb = new StringBuffer();

        private SendBufferWriter() {
        }

        private void checkFlush(boolean force) {
            if (force && this.sb.length() > 0 || this.sb.length() > 2048) {
                NettyServerCnxn.this.sendBuffer(ByteBuffer.wrap(this.sb.toString().getBytes()));
                this.sb.setLength(0);
            }
        }

        @Override
        public void close() throws IOException {
            if (this.sb == null) {
                return;
            }
            this.checkFlush(true);
            this.sb = null;
        }

        @Override
        public void flush() throws IOException {
            this.checkFlush(true);
        }

        @Override
        public void write(char[] cbuf, int off, int len) throws IOException {
            this.sb.append(cbuf, off, len);
            this.checkFlush(false);
        }
    }

    static class ResumeMessageEvent
    implements MessageEvent {
        Channel channel;

        ResumeMessageEvent(Channel channel) {
            this.channel = channel;
        }

        public Object getMessage() {
            return null;
        }

        public SocketAddress getRemoteAddress() {
            return null;
        }

        public Channel getChannel() {
            return this.channel;
        }

        public ChannelFuture getFuture() {
            return null;
        }
    }
}

