/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.client.impl;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.impl.running.ClientRunningData;
import com.alibaba.otter.canal.client.impl.running.ClientRunningListener;
import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.common.utils.BooleanMutex;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalPacket;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleCanalConnector
implements CanalConnector {
    private static final Logger logger = LoggerFactory.getLogger(SimpleCanalConnector.class);
    private SocketAddress address;
    private String username;
    private String password;
    private int soTimeout = 60000;
    private String filter;
    private final ByteBuffer readHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
    private final ByteBuffer writeHeader = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
    private SocketChannel channel;
    private List<CanalPacket.Compression> supportedCompressions = new ArrayList<CanalPacket.Compression>();
    private ClientIdentity clientIdentity;
    private ClientRunningMonitor runningMonitor;
    private ZkClientx zkClientx;
    private BooleanMutex mutex = new BooleanMutex(Boolean.valueOf(false));
    private volatile boolean connected = false;
    private boolean rollbackOnConnect = true;
    private boolean rollbackOnDisConnect = false;
    private Object readDataLock = new Object();
    private Object writeDataLock = new Object();

    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination) {
        this(address, username, password, destination, 60000);
    }

    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination, int soTimeout) {
        this.address = address;
        this.username = username;
        this.password = password;
        this.soTimeout = soTimeout;
        this.clientIdentity = new ClientIdentity(destination, 1001);
    }

    @Override
    public void connect() throws CanalClientException {
        if (this.connected) {
            return;
        }
        if (this.runningMonitor != null) {
            if (!this.runningMonitor.isStart()) {
                this.runningMonitor.start();
            }
        } else {
            this.waitClientRunning();
            this.doConnect();
            if (this.filter != null) {
                this.subscribe(this.filter);
            }
            if (this.rollbackOnConnect) {
                this.rollback();
            }
        }
        this.connected = true;
    }

    @Override
    public void disconnect() throws CanalClientException {
        if (this.rollbackOnDisConnect && this.channel.isConnected()) {
            this.rollback();
        }
        this.connected = false;
        if (this.runningMonitor != null) {
            if (this.runningMonitor.isStart()) {
                this.runningMonitor.stop();
            }
        } else {
            this.doDisconnnect();
        }
    }

    private InetSocketAddress doConnect() throws CanalClientException {
        try {
            this.channel = SocketChannel.open();
            this.channel.socket().setSoTimeout(this.soTimeout);
            SocketAddress address = this.getAddress();
            if (address == null) {
                address = this.getNextAddress();
            }
            this.channel.connect(address);
            CanalPacket.Packet p = CanalPacket.Packet.parseFrom((byte[])this.readNextPacket(this.channel));
            if (p.getVersion() != 1) {
                throw new CanalClientException("unsupported version at this client.");
            }
            if (p.getType() != CanalPacket.PacketType.HANDSHAKE) {
                throw new CanalClientException("expect handshake but found other type.");
            }
            CanalPacket.Handshake handshake = CanalPacket.Handshake.parseFrom((ByteString)p.getBody());
            this.supportedCompressions.addAll(handshake.getSupportedCompressionsList());
            CanalPacket.ClientAuth ca = CanalPacket.ClientAuth.newBuilder().setUsername(this.username != null ? this.username : "").setNetReadTimeout(this.soTimeout).setNetWriteTimeout(this.soTimeout).build();
            this.writeWithHeader(this.channel, CanalPacket.Packet.newBuilder().setType(CanalPacket.PacketType.CLIENTAUTHENTICATION).setBody(ca.toByteString()).build().toByteArray());
            CanalPacket.Packet ack = CanalPacket.Packet.parseFrom((byte[])this.readNextPacket(this.channel));
            if (ack.getType() != CanalPacket.PacketType.ACK) {
                throw new CanalClientException("unexpected packet type when ack is expected");
            }
            CanalPacket.Ack ackBody = CanalPacket.Ack.parseFrom((ByteString)ack.getBody());
            if (ackBody.getErrorCode() > 0) {
                throw new CanalClientException("something goes wrong when doing authentication: " + ackBody.getErrorMessage());
            }
            this.connected = true;
            return new InetSocketAddress(this.channel.socket().getLocalAddress(), this.channel.socket().getLocalPort());
        }
        catch (IOException e) {
            throw new CanalClientException((Throwable)e);
        }
    }

    private void doDisconnnect() throws CanalClientException {
        if (this.channel != null) {
            try {
                this.channel.close();
            }
            catch (IOException e) {
                logger.warn("exception on closing channel:{} \n {}", (Object)this.channel, (Object)e);
            }
            this.channel = null;
        }
    }

    @Override
    public void subscribe() throws CanalClientException {
        this.subscribe("");
    }

    @Override
    public void subscribe(String filter) throws CanalClientException {
        this.waitClientRunning();
        try {
            this.writeWithHeader(this.channel, CanalPacket.Packet.newBuilder().setType(CanalPacket.PacketType.SUBSCRIPTION).setBody(CanalPacket.Sub.newBuilder().setDestination(this.clientIdentity.getDestination()).setClientId(String.valueOf(this.clientIdentity.getClientId())).setFilter(filter != null ? filter : "").build().toByteString()).build().toByteArray());
            CanalPacket.Packet p = CanalPacket.Packet.parseFrom((byte[])this.readNextPacket(this.channel));
            CanalPacket.Ack ack = CanalPacket.Ack.parseFrom((ByteString)p.getBody());
            if (ack.getErrorCode() > 0) {
                throw new CanalClientException("failed to subscribe with reason: " + ack.getErrorMessage());
            }
            this.clientIdentity.setFilter(filter);
        }
        catch (IOException e) {
            throw new CanalClientException((Throwable)e);
        }
    }

    @Override
    public void unsubscribe() throws CanalClientException {
        this.waitClientRunning();
        try {
            this.writeWithHeader(this.channel, CanalPacket.Packet.newBuilder().setType(CanalPacket.PacketType.UNSUBSCRIPTION).setBody(CanalPacket.Unsub.newBuilder().setDestination(this.clientIdentity.getDestination()).setClientId(String.valueOf(this.clientIdentity.getClientId())).build().toByteString()).build().toByteArray());
            CanalPacket.Packet p = CanalPacket.Packet.parseFrom((byte[])this.readNextPacket(this.channel));
            CanalPacket.Ack ack = CanalPacket.Ack.parseFrom((ByteString)p.getBody());
            if (ack.getErrorCode() > 0) {
                throw new CanalClientException("failed to unSubscribe with reason: " + ack.getErrorMessage());
            }
        }
        catch (IOException e) {
            throw new CanalClientException((Throwable)e);
        }
    }

    @Override
    public Message get(int batchSize) throws CanalClientException {
        return this.get(batchSize, null, null);
    }

    @Override
    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        Message message = this.getWithoutAck(batchSize, timeout, unit);
        this.ack(message.getId());
        return message;
    }

    @Override
    public Message getWithoutAck(int batchSize) throws CanalClientException {
        return this.getWithoutAck(batchSize, null, null);
    }

    @Override
    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        this.waitClientRunning();
        try {
            long time;
            int size = batchSize <= 0 ? 1000 : batchSize;
            long l = time = timeout == null || timeout < 0L ? -1L : timeout;
            if (unit == null) {
                unit = TimeUnit.MILLISECONDS;
            }
            this.writeWithHeader(this.channel, CanalPacket.Packet.newBuilder().setType(CanalPacket.PacketType.GET).setBody(CanalPacket.Get.newBuilder().setAutoAck(false).setDestination(this.clientIdentity.getDestination()).setClientId(String.valueOf(this.clientIdentity.getClientId())).setFetchSize(size).setTimeout(time).setUnit(unit.ordinal()).build().toByteString()).build().toByteArray());
            return this.receiveMessages();
        }
        catch (IOException e) {
            throw new CanalClientException((Throwable)e);
        }
    }

    private Message receiveMessages() throws InvalidProtocolBufferException, IOException {
        CanalPacket.Packet p = CanalPacket.Packet.parseFrom((byte[])this.readNextPacket(this.channel));
        switch (p.getType()) {
            case MESSAGES: {
                if (!p.getCompression().equals((Object)CanalPacket.Compression.NONE)) {
                    throw new CanalClientException("compression is not supported in this connector");
                }
                CanalPacket.Messages messages = CanalPacket.Messages.parseFrom((ByteString)p.getBody());
                Message result = new Message(messages.getBatchId());
                for (ByteString byteString : messages.getMessagesList()) {
                    result.addEntry(CanalEntry.Entry.parseFrom((ByteString)byteString));
                }
                return result;
            }
            case ACK: {
                CanalPacket.Ack ack = CanalPacket.Ack.parseFrom((ByteString)p.getBody());
                throw new CanalClientException("something goes wrong with reason: " + ack.getErrorMessage());
            }
        }
        throw new CanalClientException("unexpected packet type: " + p.getType());
    }

    @Override
    public void ack(long batchId) throws CanalClientException {
        this.waitClientRunning();
        CanalPacket.ClientAck ca = CanalPacket.ClientAck.newBuilder().setDestination(this.clientIdentity.getDestination()).setClientId(String.valueOf(this.clientIdentity.getClientId())).setBatchId(batchId).build();
        try {
            this.writeWithHeader(this.channel, CanalPacket.Packet.newBuilder().setType(CanalPacket.PacketType.CLIENTACK).setBody(ca.toByteString()).build().toByteArray());
        }
        catch (IOException e) {
            throw new CanalClientException((Throwable)e);
        }
    }

    @Override
    public void rollback(long batchId) throws CanalClientException {
        this.waitClientRunning();
        CanalPacket.ClientRollback ca = CanalPacket.ClientRollback.newBuilder().setDestination(this.clientIdentity.getDestination()).setClientId(String.valueOf(this.clientIdentity.getClientId())).setBatchId(batchId).build();
        try {
            this.writeWithHeader(this.channel, CanalPacket.Packet.newBuilder().setType(CanalPacket.PacketType.CLIENTROLLBACK).setBody(ca.toByteString()).build().toByteArray());
        }
        catch (IOException e) {
            throw new CanalClientException((Throwable)e);
        }
    }

    @Override
    public void rollback() throws CanalClientException {
        this.waitClientRunning();
        this.rollback(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeWithHeader(SocketChannel channel, byte[] body) throws IOException {
        Object object = this.writeDataLock;
        synchronized (object) {
            this.writeHeader.clear();
            this.writeHeader.putInt(body.length);
            this.writeHeader.flip();
            channel.write(this.writeHeader);
            channel.write(ByteBuffer.wrap(body));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private byte[] readNextPacket(SocketChannel channel) throws IOException {
        Object object = this.readDataLock;
        synchronized (object) {
            this.readHeader.clear();
            this.read(channel, this.readHeader);
            int bodyLen = this.readHeader.getInt(0);
            ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN);
            this.read(channel, bodyBuf);
            return bodyBuf.array();
        }
    }

    private void read(SocketChannel channel, ByteBuffer buffer) throws IOException {
        while (buffer.hasRemaining()) {
            int r = channel.read(buffer);
            if (r != -1) continue;
            throw new IOException("end of stream when reading header");
        }
    }

    private synchronized void initClientRunningMonitor(ClientIdentity clientIdentity) {
        if (this.zkClientx != null && clientIdentity != null && this.runningMonitor == null) {
            ClientRunningData clientData = new ClientRunningData();
            clientData.setClientId(clientIdentity.getClientId());
            clientData.setAddress(AddressUtils.getHostIp());
            this.runningMonitor = new ClientRunningMonitor();
            this.runningMonitor.setDestination(clientIdentity.getDestination());
            this.runningMonitor.setZkClient(this.zkClientx);
            this.runningMonitor.setClientData(clientData);
            this.runningMonitor.setListener(new ClientRunningListener(){

                @Override
                public InetSocketAddress processActiveEnter() {
                    InetSocketAddress address = SimpleCanalConnector.this.doConnect();
                    SimpleCanalConnector.this.mutex.set(Boolean.valueOf(true));
                    if (SimpleCanalConnector.this.filter != null) {
                        SimpleCanalConnector.this.subscribe(SimpleCanalConnector.this.filter);
                    }
                    if (SimpleCanalConnector.this.rollbackOnConnect) {
                        SimpleCanalConnector.this.rollback();
                    }
                    return address;
                }

                @Override
                public void processActiveExit() {
                    SimpleCanalConnector.this.mutex.set(Boolean.valueOf(false));
                    SimpleCanalConnector.this.doDisconnnect();
                }
            });
        }
    }

    private void waitClientRunning() {
        try {
            if (this.zkClientx != null) {
                if (!this.connected) {
                    throw new CanalClientException("should connect first");
                }
                this.mutex.get();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CanalClientException((Throwable)e);
        }
    }

    @Override
    public boolean checkValid() {
        if (this.zkClientx != null) {
            return this.mutex.state();
        }
        return true;
    }

    public SocketAddress getNextAddress() {
        return null;
    }

    public SocketAddress getAddress() {
        return this.address;
    }

    public String getUsername() {
        return this.username;
    }

    public String getPassword() {
        return this.password;
    }

    public int getSoTimeout() {
        return this.soTimeout;
    }

    public void setSoTimeout(int soTimeout) {
        this.soTimeout = soTimeout;
    }

    public void setZkClientx(ZkClientx zkClientx) {
        this.zkClientx = zkClientx;
        this.initClientRunningMonitor(this.clientIdentity);
    }

    public void setRollbackOnConnect(boolean rollbackOnConnect) {
        this.rollbackOnConnect = rollbackOnConnect;
    }

    public void setRollbackOnDisConnect(boolean rollbackOnDisConnect) {
        this.rollbackOnDisConnect = rollbackOnDisConnect;
    }

    public void setFilter(String filter) {
        this.filter = filter;
    }
}

