/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.command;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import java.security.MessageDigest;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.ScanResult;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.RedisResponseTimeoutException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.RedisTryAgainException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.command.AsyncDetails;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandAsyncService
implements CommandAsyncExecutor {
    static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
    final ConnectionManager connectionManager;
    private RedissonObjectBuilder objectBuilder;
    protected RedissonClient redisson;
    protected RedissonReactiveClient redissonReactive;
    protected RedissonRxClient redissonRx;
    private static final Map<String, String> SHA_CACHE = new LRUCacheMap<String, String>(500, 0L, 0L);
    private static final Map<ClassLoader, Map<Codec, Codec>> CODECS = ReferenceCacheMap.weak(0L, 0L);

    public CommandAsyncService(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }

    @Override
    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    @Override
    public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson) {
        if (redisson != null) {
            this.redisson = redisson;
            this.enableRedissonReferenceSupport(redisson.getConfig());
            this.redissonReactive = null;
            this.redissonRx = null;
        }
        return this;
    }

    @Override
    public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
        if (redissonReactive != null) {
            this.redissonReactive = redissonReactive;
            this.enableRedissonReferenceSupport(redissonReactive.getConfig());
            this.redisson = null;
            this.redissonRx = null;
        }
        return this;
    }

    @Override
    public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonRxClient redissonRx) {
        if (redissonRx != null) {
            this.redissonReactive = null;
            this.enableRedissonReferenceSupport(redissonRx.getConfig());
            this.redisson = null;
            this.redissonRx = redissonRx;
        }
        return this;
    }

    private void enableRedissonReferenceSupport(Config config) {
        Codec codec = config.getCodec();
        this.objectBuilder = new RedissonObjectBuilder(config);
        ReferenceCodecProvider codecProvider = this.objectBuilder.getReferenceCodecProvider();
        codecProvider.registerCodec(codec.getClass(), codec);
    }

    @Override
    public boolean isRedissonReferenceSupportEnabled() {
        return this.redisson != null || this.redissonReactive != null || this.redissonRx != null;
    }

    @Override
    public void syncSubscription(RFuture<?> future) {
        MasterSlaveServersConfig config = this.connectionManager.getConfig();
        try {
            int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
            if (!future.await(timeout)) {
                ((RPromise)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        future.syncUninterruptibly();
    }

    @Override
    public <V> V get(RFuture<V> future) {
        if (!future.isDone()) {
            CountDownLatch l = new CountDownLatch(1);
            future.onComplete((res, e) -> l.countDown());
            boolean interrupted = false;
            while (!future.isDone()) {
                try {
                    l.await();
                }
                catch (InterruptedException e2) {
                    interrupted = true;
                    break;
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
        if (future.isSuccess()) {
            return future.getNow();
        }
        throw this.convertException(future);
    }

    @Override
    public boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
        CountDownLatch l = new CountDownLatch(1);
        future.onComplete((res, e) -> l.countDown());
        return l.await(timeout, timeoutUnit);
    }

    protected <R> RPromise<R> createPromise() {
        return new RedissonPromise();
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        this.async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        int slot = this.connectionManager.calcSlot(name);
        this.async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        int slot = this.connectionManager.calcSlot(key);
        this.async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        this.async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object ... params) {
        ArrayList results = new ArrayList();
        return this.readAllAsync(results, codec, command, params);
    }

    @Override
    public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
        ArrayList results = new ArrayList();
        return this.readAllAsync(results, this.connectionManager.getCodec(), command, params);
    }

    @Override
    public <T, R> RFuture<Collection<R>> readAllAsync(final Collection<R> results, Codec codec, RedisCommand<T> command, Object ... params) {
        final RPromise<Collection<R>> mainPromise = this.createPromise();
        Collection<MasterSlaveEntry> nodes = this.connectionManager.getEntrySet();
        final AtomicInteger counter = new AtomicInteger(nodes.size());
        BiConsumer<Object, Throwable> listener = new BiConsumer<Object, Throwable>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void accept(Object result, Throwable u) {
                if (u != null && !(u instanceof RedisRedirectException)) {
                    mainPromise.tryFailure(u);
                    return;
                }
                if (result instanceof Collection) {
                    Collection collection = results;
                    synchronized (collection) {
                        results.addAll((Collection)result);
                    }
                }
                Collection collection = results;
                synchronized (collection) {
                    results.add(result);
                }
                if (counter.decrementAndGet() == 0 && !mainPromise.isDone()) {
                    mainPromise.trySuccess(results);
                }
            }
        };
        for (MasterSlaveEntry entry : nodes) {
            RedissonPromise promise = new RedissonPromise();
            promise.onComplete(listener);
            this.async(true, new NodeSource(entry), codec, command, params, promise, 0, true);
        }
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        ArrayList<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(this.connectionManager.getEntrySet());
        Collections.shuffle(nodes);
        this.retryReadRandomAsync(codec, command, mainPromise, nodes, params);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        this.retryReadRandomAsync(codec, command, mainPromise, Collections.singletonList(entry), params);
        return mainPromise;
    }

    private <R, T> void retryReadRandomAsync(Codec codec, RedisCommand<T> command, RPromise<R> mainPromise, List<MasterSlaveEntry> nodes, Object ... params) {
        RedissonPromise attemptPromise = new RedissonPromise();
        attemptPromise.onComplete((res, e) -> {
            if (e == null) {
                if (res == null) {
                    if (nodes.isEmpty()) {
                        mainPromise.trySuccess(null);
                    } else {
                        this.retryReadRandomAsync(codec, command, mainPromise, nodes, params);
                    }
                } else {
                    mainPromise.trySuccess(res);
                }
            } else {
                mainPromise.tryFailure((Throwable)e);
            }
        });
        MasterSlaveEntry entry = nodes.remove(0);
        this.async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false);
    }

    @Override
    public <T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
        return this.writeAllAsync(command, null, params);
    }

    @Override
    public <R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
        return this.allAsync(false, this.connectionManager.getCodec(), command, callback, params);
    }

    @Override
    public <R, T> RFuture<R> writeAllAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
        return this.allAsync(false, codec, command, callback, params);
    }

    @Override
    public <R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
        return this.allAsync(true, this.connectionManager.getCodec(), command, callback, params);
    }

    private <T, R> RFuture<R> allAsync(boolean readOnlyMode, Codec codec, final RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
        final RedissonPromise mainPromise = new RedissonPromise();
        Collection<MasterSlaveEntry> nodes = this.connectionManager.getEntrySet();
        final AtomicInteger counter = new AtomicInteger(nodes.size());
        BiConsumer listener = new BiConsumer<T, Throwable>(){

            @Override
            public void accept(T result, Throwable u) {
                if (u != null && !(u instanceof RedisRedirectException)) {
                    mainPromise.tryFailure(u);
                    return;
                }
                if (u instanceof RedisRedirectException) {
                    result = command.getConvertor().convert(result);
                }
                if (callback != null) {
                    callback.onSlotResult(result);
                }
                if (counter.decrementAndGet() == 0) {
                    if (callback != null) {
                        mainPromise.trySuccess(callback.onFinish());
                    } else {
                        mainPromise.trySuccess(null);
                    }
                }
            }
        };
        for (MasterSlaveEntry entry : nodes) {
            RedissonPromise promise = new RedissonPromise();
            promise.onComplete(listener);
            this.async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true);
        }
        return mainPromise;
    }

    @Override
    public <V> RedisException convertException(RFuture<V> future) {
        if (future.cause() instanceof RedisException) {
            return (RedisException)future.cause();
        }
        return new RedisException("Unexpected exception while processing command", future.cause());
    }

    private NodeSource getNodeSource(String key) {
        int slot = this.connectionManager.calcSlot(key);
        MasterSlaveEntry entry = this.connectionManager.getEntry(slot);
        return new NodeSource(entry);
    }

    private NodeSource getNodeSource(byte[] key) {
        int slot = this.connectionManager.calcSlot(key);
        MasterSlaveEntry entry = this.connectionManager.getEntry(slot);
        return new NodeSource(entry);
    }

    @Override
    public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        NodeSource source = this.getNodeSource(key);
        this.async(true, source, codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        NodeSource source = this.getNodeSource(key);
        this.async(true, source, codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        this.async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        this.async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
        return this.readAsync(key, this.connectionManager.getCodec(), command, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, true, codec, evalCommandType, script, keys, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.evalAsync(new NodeSource(entry), true, codec, evalCommandType, script, keys, params);
    }

    @Override
    public <T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        int slot = this.connectionManager.calcSlot(name);
        return this.evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        NodeSource source = this.getNodeSource(key);
        return this.evalAsync(source, false, codec, evalCommandType, script, keys, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return this.evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params);
    }

    @Override
    public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
        return this.evalAllAsync(false, command, callback, script, keys, params);
    }

    public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
        final RedissonPromise mainPromise = new RedissonPromise();
        Collection<MasterSlaveEntry> entries = this.connectionManager.getEntrySet();
        final AtomicInteger counter = new AtomicInteger(entries.size());
        BiConsumer listener = new BiConsumer<T, Throwable>(){

            @Override
            public void accept(T t, Throwable u) {
                if (u != null && !(u instanceof RedisRedirectException)) {
                    mainPromise.tryFailure(u);
                    return;
                }
                callback.onSlotResult(t);
                if (counter.decrementAndGet() == 0 && !mainPromise.isDone()) {
                    mainPromise.trySuccess(callback.onFinish());
                }
            }
        };
        ArrayList<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(script);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        for (MasterSlaveEntry entry : entries) {
            RedissonPromise promise = new RedissonPromise();
            promise.onComplete(listener);
            this.async(readOnlyMode, new NodeSource(entry), this.connectionManager.getCodec(), command, args.toArray(), promise, 0, true);
        }
        return mainPromise;
    }

    private RFuture<String> loadScript(List<Object> keys, String script) {
        if (!keys.isEmpty()) {
            Object key = keys.get(0);
            if (key instanceof byte[]) {
                return this.writeAsync((byte[])key, (Codec)StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
            }
            return this.writeAsync((String)key, (Codec)StringCodec.INSTANCE, RedisCommands.SCRIPT_LOAD, script);
        }
        return this.writeAllAsync(RedisCommands.SCRIPT_LOAD, new SlotCallback<String, String>(){
            volatile String result;

            @Override
            public void onSlotResult(String result) {
                this.result = result;
            }

            @Override
            public String onFinish() {
                return this.result;
            }
        }, script);
    }

    protected boolean isEvalCacheActive() {
        return this.getConnectionManager().getCfg().isUseScriptCache();
    }

    private String calcSHA(String script) {
        String digest = SHA_CACHE.get(script);
        if (digest == null) {
            try {
                MessageDigest mdigest = MessageDigest.getInstance("SHA-1");
                byte[] s = mdigest.digest(script.getBytes());
                digest = ByteBufUtil.hexDump((byte[])s);
                SHA_CACHE.put(script, digest);
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
        return digest;
    }

    private Object[] copy(Object[] params) {
        ArrayList<Object> result = new ArrayList<Object>();
        for (Object object : params) {
            if (object instanceof ByteBuf) {
                ByteBuf b = (ByteBuf)object;
                ByteBuf nb = ByteBufAllocator.DEFAULT.buffer(b.readableBytes());
                int ri = b.readerIndex();
                nb.writeBytes(b);
                b.readerIndex(ri);
                result.add(nb);
                continue;
            }
            result.add(object);
        }
        return result.toArray();
    }

    private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        if (this.isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
            RedissonPromise mainPromise = new RedissonPromise();
            Object[] pps = this.copy(params);
            RedissonPromise promise = new RedissonPromise();
            String sha1 = this.calcSHA(script);
            RedisCommand<T> cmd = new RedisCommand<T>(evalCommandType, "EVALSHA");
            ArrayList<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
            args.add(sha1);
            args.add(keys.size());
            args.addAll(keys);
            args.addAll(Arrays.asList(params));
            this.async(false, nodeSource, codec, cmd, args.toArray(), promise, 0, false);
            promise.onComplete((res, e) -> {
                if (e != null) {
                    if (e.getMessage().startsWith("NOSCRIPT")) {
                        RFuture<String> loadFuture = this.loadScript(keys, script);
                        loadFuture.onComplete((r, ex) -> {
                            if (ex != null) {
                                this.free(pps);
                                mainPromise.tryFailure((Throwable)ex);
                                return;
                            }
                            RedisCommand command = new RedisCommand(evalCommandType, "EVALSHA");
                            ArrayList<Object> newargs = new ArrayList<Object>(2 + keys.size() + params.length);
                            newargs.add(sha1);
                            newargs.add(keys.size());
                            newargs.addAll(keys);
                            newargs.addAll(Arrays.asList(pps));
                            this.async(false, nodeSource, codec, command, newargs.toArray(), mainPromise, 0, false);
                        });
                    } else {
                        this.free(pps);
                        mainPromise.tryFailure((Throwable)e);
                    }
                    return;
                }
                this.free(pps);
                mainPromise.trySuccess(res);
            });
            return mainPromise;
        }
        RPromise<R> mainPromise = this.createPromise();
        ArrayList<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(script);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        this.async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
        return this.writeAsync(key, this.connectionManager.getCodec(), command, params);
    }

    @Override
    public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        NodeSource source = this.getNodeSource(key);
        this.async(false, source, codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    @Override
    public <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object ... params) {
        RPromise<R> mainPromise = this.createPromise();
        NodeSource source = this.getNodeSource(key);
        this.async(false, source, codec, command, params, mainPromise, 0, false);
        return mainPromise;
    }

    public <V, R> void async(boolean readOnlyMode, final NodeSource source, Codec codec, final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, int attempt, final boolean ignoreRedirect) {
        if (mainPromise.isCancelled()) {
            this.free(params);
            return;
        }
        if (!this.connectionManager.getShutdownLatch().acquire()) {
            this.free(params);
            mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
            return;
        }
        Codec codecToUse = this.getCodec(codec);
        final AsyncDetails details = AsyncDetails.acquire();
        final RFuture<RedisConnection> connectionFuture = this.getConnection(readOnlyMode, source, command);
        RedissonPromise attemptPromise = new RedissonPromise();
        details.init(connectionFuture, attemptPromise, readOnlyMode, source, codecToUse, command, params, mainPromise, attempt);
        BiConsumer mainPromiseListener = new BiConsumer<R, Throwable>(){

            @Override
            public void accept(R t, Throwable u) {
                if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {
                    log.debug("Connection obtaining canceled for {}", (Object)command);
                    details.getTimeout().cancel();
                    if (details.getAttemptPromise().cancel(false)) {
                        CommandAsyncService.this.free(params);
                    }
                }
            }
        };
        TimerTask retryTimerTask = new TimerTask(){

            public void run(Timeout t) throws Exception {
                if (details.getAttemptPromise().isDone()) {
                    return;
                }
                if (details.getConnectionFuture().cancel(false)) {
                    if (details.getException() == null) {
                        details.setException(new RedisTimeoutException("Unable to get connection! Try to increase 'nettyThreads' and 'connection pool' settings or set decodeInExecutor = true and increase 'threads' settingNode source: " + source + ", command: " + LogHelper.toString(command, details.getParams()) + " after " + details.getAttempt() + " retry attempts"));
                    }
                } else if (details.getConnectionFuture().isSuccess()) {
                    if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {
                        if (details.getAttempt() == CommandAsyncService.this.connectionManager.getConfig().getRetryAttempts()) {
                            if (details.getWriteFuture() != null && details.getWriteFuture().cancel(false)) {
                                if (details.getException() == null) {
                                    details.setException(new RedisTimeoutException("Unable to send command! Node source: " + source + ", connection: " + details.getConnectionFuture().getNow() + ", current command in queue: " + details.getConnectionFuture().getNow().getCurrentCommand() + ", command: " + LogHelper.toString(command, details.getParams()) + " after " + CommandAsyncService.this.connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
                                }
                                details.getAttemptPromise().tryFailure(details.getException());
                            }
                            return;
                        }
                        details.incAttempt();
                        Timeout timeout = CommandAsyncService.this.connectionManager.newTimeout(this, CommandAsyncService.this.connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
                        details.setTimeout(timeout);
                        return;
                    }
                    if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) {
                        return;
                    }
                }
                if (details.getMainPromise().isCancelled()) {
                    if (details.getAttemptPromise().cancel(false)) {
                        CommandAsyncService.this.free(details.getParams());
                        AsyncDetails.release(details);
                    }
                    return;
                }
                if (details.getAttempt() == CommandAsyncService.this.connectionManager.getConfig().getRetryAttempts()) {
                    if (details.getException() == null) {
                        details.setException(new RedisTimeoutException("Unable to send command! Node source: " + source + ", command: " + LogHelper.toString(command, details.getParams()) + " after " + CommandAsyncService.this.connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
                    }
                    details.getAttemptPromise().tryFailure(details.getException());
                    return;
                }
                if (!details.getAttemptPromise().cancel(false)) {
                    return;
                }
                int count = details.getAttempt() + 1;
                if (log.isDebugEnabled()) {
                    log.debug("attempt {} for command {} and params {}", new Object[]{count, details.getCommand(), LogHelper.toString(details.getParams())});
                }
                details.removeMainPromiseListener();
                CommandAsyncService.this.async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
                AsyncDetails.release(details);
            }
        };
        Timeout timeout = this.connectionManager.newTimeout(retryTimerTask, this.connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
        details.setTimeout(timeout);
        details.setupMainPromiseListener(mainPromiseListener);
        connectionFuture.onComplete((connection, e) -> {
            if (connectionFuture.isCancelled()) {
                this.connectionManager.getShutdownLatch().release();
                return;
            }
            if (!connectionFuture.isSuccess()) {
                this.connectionManager.getShutdownLatch().release();
                details.setException(this.convertException(connectionFuture));
                return;
            }
            if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {
                this.releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
                return;
            }
            this.sendCommand(details, (RedisConnection)connection);
            details.getWriteFuture().addListener((GenericFutureListener)new ChannelFutureListener((RedisConnection)connection){
                final /* synthetic */ RedisConnection val$connection;
                {
                    this.val$connection = redisConnection;
                }

                public void operationComplete(ChannelFuture future) throws Exception {
                    CommandAsyncService.this.checkWriteFuture(details, ignoreRedirect, this.val$connection);
                }
            });
            this.releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
        });
        attemptPromise.onComplete((r, e) -> this.checkAttemptFuture(source, details, attemptPromise, ignoreRedirect));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Codec getCodec(Codec codec) {
        if (codec == null) {
            return codec;
        }
        for (Class<?> clazz : BaseCodec.SKIPPED_CODECS) {
            if (!clazz.isAssignableFrom(codec.getClass())) continue;
            return codec;
        }
        Codec codecToUse = codec;
        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
        if (threadClassLoader != null) {
            Map<Codec, Codec> map = CODECS.get(threadClassLoader);
            if (map == null) {
                Map<ClassLoader, Map<Codec, Codec>> map2 = CODECS;
                synchronized (map2) {
                    map = CODECS.get(threadClassLoader);
                    if (map == null) {
                        map = new ConcurrentHashMap<Codec, Codec>();
                        CODECS.put(threadClassLoader, map);
                    }
                }
            }
            if ((codecToUse = map.get(codec)) == null) {
                try {
                    codecToUse = (Codec)codec.getClass().getConstructor(ClassLoader.class, codec.getClass()).newInstance(threadClassLoader, codec);
                }
                catch (NoSuchMethodException e) {
                    codecToUse = codec;
                }
                catch (Exception e) {
                    throw new IllegalStateException(e);
                }
                map.put(codec, codecToUse);
            }
        }
        return codecToUse;
    }

    protected <V> RFuture<RedisConnection> getConnection(boolean readOnlyMode, NodeSource source, RedisCommand<V> command) {
        RFuture<RedisConnection> connectionFuture = readOnlyMode ? this.connectionManager.connectionReadOp(source, command) : this.connectionManager.connectionWriteOp(source, command);
        return connectionFuture;
    }

    protected void free(Object[] params) {
        for (Object obj : params) {
            ReferenceCountUtil.safeRelease((Object)obj);
        }
    }

    private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final boolean ignoreRedirect, final RedisConnection connection) {
        ChannelFuture future = details.getWriteFuture();
        if (future.isCancelled() || details.getAttemptPromise().isDone()) {
            return;
        }
        if (!future.isSuccess()) {
            details.setException(new WriteRedisConnectionException("Unable to send command! Node source: " + details.getSource() + ", connection: " + connection + ", command: " + LogHelper.toString(details.getCommand(), details.getParams()) + " after " + details.getAttempt() + " retry attempts", future.cause()));
            if (details.getAttempt() == this.connectionManager.getConfig().getRetryAttempts() && !details.getAttemptPromise().tryFailure(details.getException())) {
                log.error(details.getException().getMessage());
            }
            return;
        }
        details.getTimeout().cancel();
        long timeoutTime = this.connectionManager.getConfig().getTimeout();
        if (RedisCommands.BLOCKING_COMMAND_NAMES.contains(details.getCommand().getName()) || RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand())) {
            Long popTimeout = null;
            if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand())) {
                boolean found = false;
                for (Object param : details.getParams()) {
                    if (found) {
                        popTimeout = Long.valueOf(param.toString()) / 1000L;
                        break;
                    }
                    if (!"BLOCK".equals(param)) continue;
                    found = true;
                }
            } else {
                popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
            }
            this.handleBlockingOperations(details, connection, popTimeout);
            if (popTimeout == 0L) {
                return;
            }
            timeoutTime += popTimeout * 1000L;
            timeoutTime += 1000L;
        }
        final long timeoutAmount = timeoutTime;
        TimerTask timeoutTask = new TimerTask(){

            public void run(Timeout timeout) throws Exception {
                if (details.getAttempt() < CommandAsyncService.this.connectionManager.getConfig().getRetryAttempts()) {
                    if (!details.getAttemptPromise().cancel(false)) {
                        return;
                    }
                    int count = details.getAttempt() + 1;
                    if (log.isDebugEnabled()) {
                        log.debug("attempt {} for command {} and params {}", new Object[]{count, details.getCommand(), LogHelper.toString(details.getParams())});
                    }
                    details.removeMainPromiseListener();
                    CommandAsyncService.this.async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
                    AsyncDetails.release(details);
                    return;
                }
                details.getAttemptPromise().tryFailure(new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured after " + CommandAsyncService.this.connectionManager.getConfig().getRetryAttempts() + " retry attempts. Command: " + LogHelper.toString(details.getCommand(), details.getParams()) + ", channel: " + connection.getChannel()));
            }
        };
        Timeout timeout = this.connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
        details.setTimeout(timeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <R, V> void handleBlockingOperations(final AsyncDetails<V, R> details, final RedisConnection connection, Long popTimeout) {
        final AtomicBoolean skip = new AtomicBoolean();
        BiConsumer<Boolean, Throwable> listener = new BiConsumer<Boolean, Throwable>(){

            @Override
            public void accept(Boolean t, Throwable u) {
                if (skip.get()) {
                    return;
                }
                details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown"));
            }
        };
        Timeout scheduledFuture = popTimeout != 0L ? this.connectionManager.newTimeout(new TimerTask(){

            public void run(Timeout timeout) throws Exception {
                if (details.getAttemptPromise().trySuccess(null)) {
                    connection.forceFastReconnectAsync();
                }
            }
        }, popTimeout, TimeUnit.SECONDS) : null;
        details.getMainPromise().onComplete((res, e) -> {
            if (scheduledFuture != null) {
                scheduledFuture.cancel();
            }
            BiConsumer biConsumer = listener;
            synchronized (biConsumer) {
                skip.set(true);
            }
            if (details.getMainPromise().isCancelled() && !details.getAttemptPromise().isDone()) {
                log.debug("Canceled blocking operation {} used {}", details.getCommand(), (Object)connection);
                connection.forceFastReconnectAsync().onComplete((r, ex) -> details.getAttemptPromise().cancel(true));
                return;
            }
            if (e instanceof RedissonShutdownException) {
                details.getAttemptPromise().tryFailure((Throwable)e);
            }
        });
        BiConsumer<Boolean, Throwable> biConsumer = listener;
        synchronized (biConsumer) {
            if (!details.getMainPromise().isDone()) {
                this.connectionManager.getShutdownPromise().onComplete(listener);
            }
        }
    }

    protected <V, R> void releaseConnection(NodeSource source, RFuture<RedisConnection> connectionFuture, boolean isReadOnly, RPromise<R> attemptPromise, AsyncDetails<V, R> details) {
        attemptPromise.onComplete((res, e) -> {
            if (!connectionFuture.isSuccess()) {
                return;
            }
            RedisConnection connection = (RedisConnection)connectionFuture.getNow();
            this.connectionManager.getShutdownLatch().release();
            if (isReadOnly) {
                this.connectionManager.releaseRead(source, connection);
            } else {
                this.connectionManager.releaseWrite(source, connection);
            }
            if (log.isDebugEnabled()) {
                log.debug("connection released for command {} and params {} from slot {} using connection {}", new Object[]{details.getCommand(), LogHelper.toString(details.getParams()), details.getSource(), connection});
            }
        });
    }

    protected <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details, RFuture<R> future, final boolean ignoreRedirect) {
        details.getTimeout().cancel();
        if (future.isCancelled()) {
            return;
        }
        try {
            details.removeMainPromiseListener();
            if (future.cause() instanceof RedisMovedException && !ignoreRedirect) {
                RedisMovedException ex = (RedisMovedException)future.cause();
                if (source.getRedirect() == NodeSource.Redirect.MOVED) {
                    details.getMainPromise().tryFailure(new RedisException("MOVED redirection loop detected. Node " + source.getAddr() + " has further redirect to " + ex.getUrl()));
                    return;
                }
                this.async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), this.connectionManager.applyNatMap(ex.getUrl()), NodeSource.Redirect.MOVED), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
                AsyncDetails.release(details);
                return;
            }
            if (future.cause() instanceof RedisAskException && !ignoreRedirect) {
                RedisAskException ex = (RedisAskException)future.cause();
                this.async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), this.connectionManager.applyNatMap(ex.getUrl()), NodeSource.Redirect.ASK), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
                AsyncDetails.release(details);
                return;
            }
            if ((future.cause() instanceof RedisLoadingException || future.cause() instanceof RedisTryAgainException) && details.getAttempt() < this.connectionManager.getConfig().getRetryAttempts()) {
                this.connectionManager.newTimeout(new TimerTask(){

                    public void run(Timeout timeout) throws Exception {
                        CommandAsyncService.this.async(details.isReadOnlyMode(), source, details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt() + 1, ignoreRedirect);
                    }
                }, Math.min(this.connectionManager.getConfig().getTimeout(), 1000), TimeUnit.MILLISECONDS);
                AsyncDetails.release(details);
                return;
            }
            this.free(details.getParams());
            if (future.isSuccess()) {
                R res = future.getNow();
                if (res instanceof ScanResult) {
                    ((ScanResult)res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient());
                }
                this.handleSuccess(details, details.getMainPromise(), details.getCommand(), res);
            } else {
                this.handleError(details, details.getMainPromise(), future.cause());
            }
            AsyncDetails.release(details);
        }
        catch (Exception e) {
            this.handleError(details, details.getMainPromise(), e);
            throw e;
        }
    }

    protected <V, R> void handleError(AsyncDetails<V, R> details, RPromise<R> mainPromise, Throwable cause) {
        mainPromise.tryFailure(cause);
    }

    protected <V, R> void handleSuccess(AsyncDetails<V, R> details, RPromise<R> promise, RedisCommand<?> command, R res) {
        if (this.isRedissonReferenceSupportEnabled()) {
            this.handleReference(promise, res);
        } else {
            promise.trySuccess(res);
        }
    }

    private <R, V> void handleReference(RPromise<R> mainPromise, R res) {
        mainPromise.trySuccess(this.tryHandleReference(res));
    }

    protected Object tryHandleReference(Object o) {
        boolean hasConversion = false;
        if (o instanceof List) {
            List r = (List)o;
            for (int i = 0; i < r.size(); ++i) {
                Object ref = this.tryHandleReference0(r.get(i));
                if (ref == r.get(i)) continue;
                r.set(i, ref);
            }
            return o;
        }
        if (o instanceof Set) {
            Set<Object> set = (LinkedHashSet)o;
            Set r = (Set)o;
            boolean useNewSet = o instanceof LinkedHashSet;
            try {
                set = (Set)o.getClass().getConstructor(new Class[0]).newInstance(new Object[0]);
            }
            catch (Exception exception) {
                set = new LinkedHashSet();
            }
            for (Object i : r) {
                Object ref = this.tryHandleReference0(i);
                if (useNewSet) {
                    set.add(ref);
                } else {
                    try {
                        r.add(ref);
                        set.add(i);
                    }
                    catch (Exception e) {
                        useNewSet = true;
                        set.add(ref);
                    }
                }
                hasConversion |= ref != i;
            }
            if (!hasConversion) {
                return o;
            }
            if (useNewSet) {
                return set;
            }
            if (!set.isEmpty()) {
                r.removeAll(set);
            }
            return o;
        }
        if (o instanceof Map) {
            Map r = (Map)o;
            for (Map.Entry e : r.entrySet()) {
                if (!(e.getKey() instanceof RedissonReference) && !(e.getValue() instanceof RedissonReference)) continue;
                Object key = e.getKey();
                Object value = e.getValue();
                if (e.getKey() instanceof RedissonReference) {
                    key = this.fromReference(e.getKey());
                    r.remove(e.getKey());
                }
                if (e.getValue() instanceof RedissonReference) {
                    value = this.fromReference(e.getValue());
                }
                r.put(key, value);
            }
            return o;
        }
        if (o instanceof ListScanResult) {
            this.tryHandleReference(((ListScanResult)o).getValues());
            return o;
        }
        if (o instanceof MapScanResult) {
            MapScanResult scanResult = (MapScanResult)o;
            Map oldMap = ((MapScanResult)o).getMap();
            Map map = (Map)this.tryHandleReference(oldMap);
            if (map != oldMap) {
                MapScanResult newScanResult = new MapScanResult(scanResult.getPos(), map);
                newScanResult.setRedisClient(scanResult.getRedisClient());
                return newScanResult;
            }
            return o;
        }
        return this.tryHandleReference0(o);
    }

    private Object tryHandleReference0(Object o) {
        if (o instanceof RedissonReference) {
            return this.fromReference(o);
        }
        if (o instanceof ScoredEntry && ((ScoredEntry)o).getValue() instanceof RedissonReference) {
            ScoredEntry se = (ScoredEntry)o;
            return new ScoredEntry<Object>(se.getScore(), this.fromReference(se.getValue()));
        }
        if (o instanceof Map.Entry) {
            Map.Entry old = (Map.Entry)o;
            Object key = this.tryHandleReference0(old.getKey());
            Object value = this.tryHandleReference0(old.getValue());
            if (value != old.getValue() || key != old.getKey()) {
                return new AbstractMap.SimpleEntry<Object, Object>(key, value);
            }
        }
        return o;
    }

    private Object fromReference(Object res) {
        if (this.objectBuilder == null) {
            return res;
        }
        try {
            if (this.redisson != null) {
                return this.objectBuilder.fromReference(this.redisson, (RedissonReference)res);
            }
            if (this.redissonReactive != null) {
                return this.objectBuilder.fromReference(this.redissonReactive, (RedissonReference)res);
            }
            return this.objectBuilder.fromReference(this.redissonRx, (RedissonReference)res);
        }
        catch (Exception exception) {
            throw new IllegalStateException(exception);
        }
    }

    protected <R, V> void sendCommand(AsyncDetails<V, R> details, RedisConnection connection) {
        if (details.getSource().getRedirect() == NodeSource.Redirect.ASK) {
            ArrayList list = new ArrayList(2);
            RedissonPromise promise = new RedissonPromise();
            list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[0]));
            list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
            RedissonPromise<Void> main = new RedissonPromise<Void>();
            ChannelFuture future = connection.send(new CommandsData(main, list, false));
            details.setWriteFuture(future);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}", new Object[]{details.getCommand(), LogHelper.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection});
            }
            ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
            details.setWriteFuture(future);
        }
    }

    @Override
    public RedissonObjectBuilder getObjectBuilder() {
        return this.objectBuilder;
    }

    @Override
    public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String ... queueNames) {
        if (this.connectionManager.isClusterMode() && queueNames.length > 0) {
            RedissonPromise result = new RedissonPromise();
            AtomicReference<Iterator<String>> ref = new AtomicReference<Iterator<String>>();
            ArrayList<String> names = new ArrayList<String>();
            names.add(name);
            names.addAll(Arrays.asList(queueNames));
            ref.set(names.iterator());
            AtomicLong counter = new AtomicLong(secondsTimeout);
            this.poll(name, codec, result, ref, names, counter, command);
            return result;
        }
        ArrayList<Object> params = new ArrayList<Object>(queueNames.length + 1);
        params.add(name);
        for (String queueName : queueNames) {
            params.add(queueName);
        }
        params.add(secondsTimeout);
        return this.writeAsync(name, codec, command, params.toArray());
    }

    private <V> void poll(String name, Codec codec, RPromise<V> result, AtomicReference<Iterator<String>> ref, List<String> names, AtomicLong counter, RedisCommand<Object> command) {
        if (ref.get().hasNext()) {
            String currentName = ref.get().next().toString();
            RFuture future = this.writeAsync(currentName, codec, command, currentName, 1);
            future.onComplete((res, e) -> {
                if (e != null) {
                    result.tryFailure((Throwable)e);
                    return;
                }
                if (res != null) {
                    result.trySuccess(res);
                } else {
                    if (counter.decrementAndGet() == 0L) {
                        result.trySuccess(null);
                        return;
                    }
                    this.poll(name, codec, result, ref, names, counter, command);
                }
            });
        } else {
            ref.set(names.iterator());
            this.poll(name, codec, result, ref, names, counter, command);
        }
    }
}

