/*
 * Decompiled with CFR 0.152.
 */
package org.fisco.bcos.channel.proxy;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.io.UnsupportedEncodingException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLException;
import org.fisco.bcos.channel.handler.ChannelConnections;
import org.fisco.bcos.channel.handler.ConnectionInfo;
import org.fisco.bcos.channel.handler.Message;
import org.fisco.bcos.channel.protocol.ChannelMessageError;
import org.fisco.bcos.channel.protocol.ChannelMessageType;
import org.fisco.bcos.channel.proxy.ConnectionPair;
import org.fisco.bcos.web3j.protocol.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class Server {
    private static Logger logger = LoggerFactory.getLogger(Server.class);
    private ChannelConnections localConnections = new ChannelConnections();
    private ChannelConnections remoteConnections;
    private Map<String, ConnectionPair> seq2Connections = new ConcurrentHashMap<String, ConnectionPair>();
    private Integer bindPort = 8830;
    private Timer timeoutHandler = new HashedWheelTimer();
    private ThreadPoolTaskExecutor threadPool;

    public ChannelConnections getLocalConnections() {
        return this.localConnections;
    }

    public void setLocalConnections(ChannelConnections localConnections) {
        this.localConnections = localConnections;
    }

    public ChannelConnections getRemoteConnections() {
        return this.remoteConnections;
    }

    public void setRemoteConnections(ChannelConnections connections) {
        this.remoteConnections = connections;
    }

    public Map<String, ConnectionPair> getSeq2Connections() {
        return this.seq2Connections;
    }

    public void setSeq2Connections(Map<String, ConnectionPair> seq2Connections) {
        this.seq2Connections = seq2Connections;
    }

    public Integer getBindPort() {
        return this.bindPort;
    }

    public void setBindPort(Integer bindPort) {
        this.bindPort = bindPort;
    }

    public Timer getTimeoutHandler() {
        return this.timeoutHandler;
    }

    public void setTimeoutHandler(Timer timeoutHandler) {
        this.timeoutHandler = timeoutHandler;
    }

    public void run() throws SSLException {
        logger.debug("init ProxyServer");
        try {
            ConnectionCallback localConnectionCallback = new ConnectionCallback();
            localConnectionCallback.setServer(this);
            localConnectionCallback.setFromRemote(false);
            ConnectionCallback remoteConnectionCallback = new ConnectionCallback();
            remoteConnectionCallback.setServer(this);
            remoteConnectionCallback.setFromRemote(true);
            this.localConnections.setCallback(localConnectionCallback);
            this.localConnections.init();
            this.localConnections.startListen(this.bindPort);
            this.remoteConnections.setCallback(remoteConnectionCallback);
            this.remoteConnections.init();
            this.remoteConnections.setThreadPool(this.threadPool);
            this.remoteConnections.startConnect();
        }
        catch (Exception e) {
            logger.error("error ", (Throwable)e);
            throw e;
        }
    }

    public void broadcastTopic() {
        this.broadcastTopic(null);
    }

    public void broadcastTopic(ChannelHandlerContext ctx) {
        try {
            Message message = new Message();
            message.setResult(0);
            message.setType((short)ChannelMessageType.AMOP_CLIENT_TOPICS.getType());
            message.setSeq(UUID.randomUUID().toString().replaceAll("-", ""));
            HashSet<String> allTopics = new HashSet<String>();
            for (ConnectionInfo connectionInfo : this.localConnections.getConnections()) {
                ChannelHandlerContext localCtx = this.localConnections.getNetworkConnectionByHost(connectionInfo.getHost(), connectionInfo.getPort());
                if (localCtx == null || !localCtx.channel().isActive()) continue;
                logger.debug("node:{}:{} follow topics: {}", new Object[]{connectionInfo.getHost(), connectionInfo.getPort(), connectionInfo.getTopics()});
                allTopics.addAll(connectionInfo.getTopics());
            }
            message.setData(ObjectMapperFactory.getObjectMapper().writeValueAsBytes((Object)allTopics.toArray()));
            logger.debug("all topics: {}", (Object)new String(message.getData()));
            if (ctx == null) {
                for (String key : this.remoteConnections.getNetworkConnections().keySet()) {
                    ChannelHandlerContext remoteCtx = this.remoteConnections.getNetworkConnections().get(key);
                    if (remoteCtx == null || !remoteCtx.channel().isActive()) continue;
                    ByteBuf out = remoteCtx.alloc().buffer();
                    message.writeHeader(out);
                    message.writeExtra(out);
                    if (remoteCtx == null || !remoteCtx.channel().isActive()) continue;
                    logger.debug("send topic {}:{}", (Object)((SocketChannel)remoteCtx.channel()).remoteAddress().getAddress().getHostAddress(), (Object)((SocketChannel)remoteCtx.channel()).remoteAddress().getPort());
                    remoteCtx.writeAndFlush((Object)out);
                }
            } else {
                logger.debug("topic send to {}:{}", (Object)((SocketChannel)ctx.channel()).remoteAddress().getAddress().getHostAddress(), (Object)((SocketChannel)ctx.channel()).remoteAddress().getPort());
                ByteBuf out = ctx.alloc().buffer();
                message.writeHeader(out);
                message.writeExtra(out);
                ctx.writeAndFlush((Object)out);
            }
        }
        catch (Exception e) {
            logger.error("error ", (Throwable)e);
        }
    }

    public void onLocalMessage(ChannelHandlerContext ctx, Message message) {
        try {
            logger.debug("sdk request: " + message.getSeq());
            ChannelHandlerContext remoteCtx = null;
            ConnectionPair pair = this.seq2Connections.get(message.getSeq());
            if (pair != null) {
                logger.debug("seq existed");
                remoteCtx = pair.remoteConnection;
                if (message.getType().shortValue() != ChannelMessageType.AMOP_RESPONSE.getType()) {
                    pair.localConnection = ctx;
                }
                ByteBuf out = remoteCtx.alloc().buffer();
                message.writeHeader(out);
                message.writeExtra(out);
                logger.debug("msg send to:{}:{}", (Object)((SocketChannel)remoteCtx.channel()).remoteAddress().getAddress().getHostAddress(), (Object)((SocketChannel)remoteCtx.channel()).remoteAddress().getPort());
                remoteCtx.writeAndFlush((Object)out);
            } else {
                pair = new ConnectionPair();
                pair.localConnection = ctx;
                pair.setServer(this);
                pair.setMessage(message);
                logger.debug("other type message\uff0cConnectionPair");
                pair.setRemoteChannelConnections(this.remoteConnections);
                ArrayList<ConnectionInfo> remoteConnectionInfos = new ArrayList<ConnectionInfo>();
                remoteConnectionInfos.addAll(this.remoteConnections.getConnections());
                pair.setRemoteConnectionInfos(remoteConnectionInfos);
                this.seq2Connections.put(message.getSeq(), pair);
                pair.init();
                pair.retrySendRemoteMessage();
            }
        }
        catch (Exception e) {
            logger.error("error ", (Throwable)e);
        }
    }

    public void onRemoteMessage(ChannelHandlerContext ctx, Message message) {
        try {
            ByteBuf out;
            logger.debug("processing : " + message.getSeq());
            ChannelHandlerContext localCtx = null;
            ConnectionPair pair = this.seq2Connections.get(message.getSeq());
            if (message.getType() == (short)ChannelMessageType.AMOP_REQUEST.getType()) {
                Short length = message.getData()[0];
                String topic = new String(message.getData(), 1, length - 1);
                HashSet<ChannelHandlerContext> topicCtxs = new HashSet<ChannelHandlerContext>();
                for (ConnectionInfo connectionInfo : this.localConnections.getConnections()) {
                    ChannelHandlerContext topicCtx;
                    if (!connectionInfo.getTopics().contains(topic) || (topicCtx = this.localConnections.getNetworkConnectionByHost(connectionInfo.getHost(), connectionInfo.getPort())) == null || !topicCtx.channel().isActive()) continue;
                    topicCtxs.add(topicCtx);
                }
                logger.debug("send topic:{} sum{} follow topic", (Object)topic, (Object)topicCtxs.size());
                if (topicCtxs.isEmpty()) {
                    logger.error("connection not found\uff0cerror 99");
                    message.setType((short)ChannelMessageType.AMOP_RESPONSE.getType());
                    message.setResult(ChannelMessageError.NODES_UNREACHABLE.getError());
                    ByteBuf out2 = ctx.alloc().buffer();
                    message.writeHeader(out2);
                    message.writeExtra(out2);
                    ctx.writeAndFlush((Object)out2);
                    return;
                }
                SecureRandom random = new SecureRandom();
                Integer index = random.nextInt(topicCtxs.size());
                ChannelHandlerContext target = (ChannelHandlerContext)topicCtxs.toArray()[index];
                logger.debug("send to {}:{}", (Object)((SocketChannel)target.channel()).remoteAddress().getAddress().getHostAddress(), (Object)((SocketChannel)target.channel()).remoteAddress().getPort());
                localCtx = target;
                if (pair == null) {
                    pair = new ConnectionPair();
                    pair.localConnection = localCtx;
                    pair.remoteConnection = ctx;
                    pair.setServer(this);
                    pair.setMessage(message);
                    this.seq2Connections.put(message.getSeq(), pair);
                    pair.init();
                } else {
                    pair.remoteConnection = ctx;
                }
            } else if (pair != null) {
                logger.debug("seq existed");
                localCtx = pair.localConnection;
                if (message.getResult() != 0 && message.getType().shortValue() == ChannelMessageType.AMOP_RESPONSE.getType()) {
                    logger.error("endpoint error:{}\uff0cretry", (Object)message.getResult());
                    pair.retrySendRemoteMessage();
                    return;
                }
                pair.remoteConnection = ctx;
            } else {
                localCtx = this.localConnections.randomNetworkConnection(null);
            }
            if (localCtx == null || !localCtx.channel().isActive()) {
                logger.error("connect unavailable\uff0cerror 99");
                message.setType((short)ChannelMessageType.AMOP_RESPONSE.getType());
                message.setResult(ChannelMessageError.NODES_UNREACHABLE.getError());
                out = ctx.alloc().buffer();
                message.writeHeader(out);
                message.writeExtra(out);
                ctx.writeAndFlush((Object)out);
                return;
            }
            out = localCtx.alloc().buffer();
            message.writeHeader(out);
            message.writeExtra(out);
            logger.debug("send to:{}:{}", (Object)((SocketChannel)localCtx.channel()).remoteAddress().getAddress().getHostAddress(), (Object)((SocketChannel)localCtx.channel()).remoteAddress().getPort());
            localCtx.writeAndFlush((Object)out);
        }
        catch (Exception e) {
            logger.error("error ", (Throwable)e);
        }
    }

    public void onHeartBeat(ChannelHandlerContext ctx, Message message) {
        String content = "1";
        try {
            content = new String(message.getData(), "utf-8");
        }
        catch (UnsupportedEncodingException e) {
            logger.error("unexpected heartbeat ");
        }
        catch (Exception e) {
            logger.error("heartbeat error");
        }
        if (content.equals("0")) {
            Message response = new Message();
            response.setSeq(message.getSeq());
            response.setResult(0);
            response.setType((short)ChannelMessageType.CLIENT_HEARTBEAT.getType());
            response.setData("1".getBytes());
            ByteBuf out = ctx.alloc().buffer();
            response.writeHeader(out);
            response.writeExtra(out);
            ctx.writeAndFlush((Object)out);
        } else if (content.equals("1")) {
            // empty if block
        }
    }

    public void onTopic(ChannelHandlerContext ctx, Message message) {
        logger.debug("SDK topics message: {} {}", (Object)message.getSeq(), (Object)new String(message.getData()));
        String host = ((SocketChannel)ctx.channel()).remoteAddress().getAddress().getHostAddress();
        Integer port = ((SocketChannel)ctx.channel()).remoteAddress().getPort();
        ConnectionInfo info = this.localConnections.getConnectionInfo(host, port);
        if (info != null) {
            try {
                List topics = (List)ObjectMapperFactory.getObjectMapper().readValue(message.getData(), List.class);
                info.setTopics(topics);
                this.broadcastTopic();
            }
            catch (Exception e) {
                logger.error("parse topic error", (Throwable)e);
            }
        }
    }

    public ThreadPoolTaskExecutor getThreadPool() {
        return this.threadPool;
    }

    public void setThreadPool(ThreadPoolTaskExecutor threadPool) {
        this.threadPool = threadPool;
    }

    class ConnectionCallback
    implements ChannelConnections.Callback {
        private Server server;
        private Boolean fromRemote = false;

        ConnectionCallback() {
        }

        public Server getServer() {
            return this.server;
        }

        public void setServer(Server server) {
            this.server = server;
        }

        public Boolean getFromRemote() {
            return this.fromRemote;
        }

        public void setFromRemote(Boolean fromRemote) {
            this.fromRemote = fromRemote;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onMessage(ChannelHandlerContext ctx, ByteBuf message) {
            try {
                Message msg = new Message();
                msg.readHeader(message);
                msg.readExtra(message);
                logger.debug("receive Message type: {}", (Object)msg.getType());
                if (msg.getType().shortValue() == ChannelMessageType.AMOP_REQUEST.getType() || msg.getType().shortValue() == ChannelMessageType.AMOP_RESPONSE.getType()) {
                    logger.debug("channel2");
                } else {
                    if (msg.getType().shortValue() == ChannelMessageType.AMOP_CLIENT_TOPICS.getType()) {
                        logger.debug("topic");
                        Server.this.onTopic(ctx, msg);
                        return;
                    }
                    if (msg.getType().shortValue() == ChannelMessageType.CHANNEL_RPC_REQUEST.getType()) {
                        logger.debug("ethereum");
                    } else {
                        if (msg.getType().shortValue() == ChannelMessageType.CLIENT_HEARTBEAT.getType()) {
                            Server.this.onHeartBeat(ctx, msg);
                            return;
                        }
                        if (msg.getType().shortValue() == ChannelMessageType.TRANSACTION_NOTIFY.getType()) {
                            logger.debug("transaction message call back.");
                        } else {
                            logger.error("unknown message:{}", (Object)msg.getType());
                        }
                    }
                }
                if (this.fromRemote.booleanValue()) {
                    logger.debug("remote message");
                    this.server.onRemoteMessage(ctx, msg);
                } else {
                    logger.debug("local message");
                    this.server.onLocalMessage(ctx, msg);
                }
            }
            finally {
                message.release();
            }
        }

        @Override
        public void onConnect(ChannelHandlerContext ctx) {
            if (this.fromRemote.booleanValue()) {
                try {
                    logger.debug("endpoint connection established\uff0csend topic");
                    Server.this.broadcastTopic(ctx);
                }
                catch (Exception e) {
                    logger.error("error ", (Throwable)e);
                }
            }
        }

        @Override
        public void onDisconnect(ChannelHandlerContext ctx) {
            if (!this.fromRemote.booleanValue()) {
                logger.debug("SDK disconnect\uff0cupdate and broadcast topic");
                Server.this.broadcastTopic();
            }
        }

        @Override
        public void sendHeartbeat(ChannelHandlerContext ctx) {
            SocketChannel socketChannel = (SocketChannel)ctx.channel();
            String hostAddress = socketChannel.remoteAddress().getAddress().getHostAddress();
            int port = socketChannel.remoteAddress().getPort();
            String remoteEndPoint = hostAddress + ":" + port;
            logger.trace("proxy server send heart beat message, remote host is {}", (Object)remoteEndPoint);
        }
    }
}

