/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.RedissonExpirable;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
import org.redisson.api.StreamMessageId;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedissonReliableTopic
extends RedissonExpirable
implements RReliableTopic {
    private static final Logger log = LoggerFactory.getLogger(RedissonReliableTopic.class);
    private final Map<String, Entry> listeners = new ConcurrentHashMap<String, Entry>();
    private final AtomicReference<String> subscriberId = new AtomicReference();
    private volatile RFuture<Map<StreamMessageId, Map<String, Object>>> readFuture;
    private volatile Timeout timeoutTask;

    public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
    }

    public RedissonReliableTopic(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    private String getSubscribersName() {
        return RedissonReliableTopic.suffixName(this.getName(), "subscribers");
    }

    private String getMapName() {
        return RedissonReliableTopic.suffixName(this.getName(), "map");
    }

    private String getCounter() {
        return RedissonReliableTopic.suffixName(this.getName(), "counter");
    }

    private String getTimeout() {
        return RedissonReliableTopic.suffixName(this.getName(), "timeout");
    }

    @Override
    public long publish(Object message) {
        return this.get(this.publishAsync(message));
    }

    @Override
    public <M> String addListener(Class<M> type, MessageListener<M> listener) {
        return this.get(this.addListenerAsync(type, listener));
    }

    @Override
    public void removeListener(String ... listenerIds) {
        this.get(this.removeListenerAsync(listenerIds));
    }

    @Override
    public void removeAllListeners() {
        this.get(this.removeAllListenersAsync());
    }

    @Override
    public RFuture<Void> removeAllListenersAsync() {
        this.listeners.clear();
        return this.removeSubscriber();
    }

    @Override
    public long size() {
        return this.get(this.sizeAsync());
    }

    @Override
    public RFuture<Long> sizeAsync() {
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.XLEN, this.getName());
    }

    @Override
    public int countListeners() {
        return this.listeners.size();
    }

    @Override
    public RFuture<Long> publishAsync(Object message) {
        return this.commandExecutor.evalWriteAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.EVAL_LONG, "redis.call('xadd', KEYS[1], '*', 'm', ARGV[1]); return redis.call('zcard', KEYS[2]); ", Arrays.asList(this.getName(), this.getSubscribersName()), this.encode(message));
    }

    protected String generateId() {
        byte[] id = new byte[16];
        ThreadLocalRandom.current().nextBytes(id);
        return ByteBufUtil.hexDump((byte[])id);
    }

    @Override
    public <M> RFuture<String> addListenerAsync(Class<M> type, MessageListener<M> listener) {
        String id = this.generateId();
        this.listeners.put(id, new Entry(type, listener));
        if (this.subscriberId.get() != null) {
            return RedissonPromise.newSucceededFuture(id);
        }
        if (this.subscriberId.compareAndSet(null, id)) {
            this.renewExpiration();
            StreamMessageId startId = new StreamMessageId(System.currentTimeMillis(), 0L);
            RedissonPromise<String> promise = new RedissonPromise<String>();
            RFuture addFuture = this.commandExecutor.evalWriteAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('incr', KEYS[3]); redis.call('zadd', KEYS[4], ARGV[3], ARGV[2]); redis.call('zadd', KEYS[1], value, ARGV[2]); redis.call('hset', KEYS[2], ARGV[2], ARGV[1]); ", Arrays.asList(this.getSubscribersName(), this.getMapName(), this.getCounter(), this.getTimeout()), startId, id, System.currentTimeMillis() + this.commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout());
            addFuture.onComplete((r, e) -> {
                if (e != null) {
                    promise.tryFailure((Throwable)e);
                    return;
                }
                this.poll(id, startId);
                promise.trySuccess(id);
            });
            return promise;
        }
        return RedissonPromise.newSucceededFuture(id);
    }

    private void poll(String id, StreamMessageId startId) {
        this.readFuture = this.commandExecutor.readAsync(this.getName(), (Codec)new CompositeCodec(StringCodec.INSTANCE, this.codec), RedisCommands.XREAD_BLOCKING_SINGLE, "BLOCK", 0, "STREAMS", this.getName(), startId);
        this.readFuture.onComplete((res, ex) -> {
            if (this.readFuture.isCancelled()) {
                return;
            }
            if (ex != null) {
                if (ex instanceof RedissonShutdownException) {
                    return;
                }
                this.poll(id, startId);
                return;
            }
            this.commandExecutor.getConnectionManager().getExecutor().execute(() -> res.values().forEach(entry -> {
                Object m = entry.get("m");
                this.listeners.values().forEach(e -> {
                    if (e.getType().isInstance(m)) {
                        e.getListener().onMessage(this.getName(), m);
                    }
                });
            }));
            if (this.listeners.isEmpty()) {
                return;
            }
            StreamMessageId lastId = (StreamMessageId)res.keySet().stream().skip(res.size() - 1).findFirst().get();
            long time = System.currentTimeMillis();
            RFuture updateFuture = this.commandExecutor.evalWriteAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local r = redis.call('zscore', KEYS[2], ARGV[2]); if r ~= false then local value = redis.call('incr', KEYS[4]); redis.call('zadd', KEYS[2], value, ARGV[2]); redis.call('hset', KEYS[3], ARGV[2], ARGV[1]); end; local t = redis.call('zrange', KEYS[5], 0, 0, 'WITHSCORES'); if tonumber(t[2]) < tonumber(ARGV[3]) then redis.call('hdel', KEYS[3], t[1]); redis.call('zrem', KEYS[2], t[1]); redis.call('zrem', KEYS[5], t[1]); end; local v = redis.call('zrange', KEYS[2], 0, 0); local score = redis.call('hget', KEYS[3], v[1]); local range = redis.call('xrange', KEYS[1], score, '+'); if #range == 0 then redis.call('del', KEYS[1]); elseif #range == 1 and range[1][1] == score then redis.call('del', KEYS[1]); else redis.call('xtrim', KEYS[1], 'maxlen', #range); end;return r ~= false; ", Arrays.asList(this.getName(), this.getSubscribersName(), this.getMapName(), this.getCounter(), this.getTimeout()), lastId, id, time);
            updateFuture.onComplete((re, exc) -> {
                if (exc != null) {
                    if (exc instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error("Unable to update subscriber status", exc);
                    return;
                }
                if (!re.booleanValue() || this.listeners.isEmpty()) {
                    return;
                }
                this.poll(id, lastId);
            });
        });
    }

    @Override
    public RFuture<Boolean> deleteAsync() {
        return this.deleteAsync(this.getName(), this.getSubscribersName(), this.getMapName(), this.getCounter(), this.getTimeout());
    }

    @Override
    public RFuture<Long> sizeInMemoryAsync() {
        return super.sizeInMemoryAsync(Arrays.asList(this.getName(), this.getSubscribersName(), this.getMapName(), this.getCounter(), this.getTimeout()));
    }

    @Override
    public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
        return this.expireAsync(timeToLive, timeUnit, this.getName(), this.getSubscribersName(), this.getMapName(), this.getCounter(), this.getTimeout());
    }

    @Override
    public RFuture<Boolean> expireAtAsync(long timestamp) {
        return this.expireAtAsync(timestamp, this.getName(), this.getSubscribersName(), this.getMapName(), this.getCounter(), this.getTimeout());
    }

    @Override
    public RFuture<Boolean> clearExpireAsync() {
        return this.clearExpireAsync(this.getName(), this.getSubscribersName(), this.getMapName(), this.getCounter(), this.getTimeout());
    }

    @Override
    public RFuture<Void> removeListenerAsync(String ... listenerIds) {
        this.listeners.keySet().removeAll(Arrays.asList(listenerIds));
        if (this.listeners.isEmpty()) {
            return this.removeSubscriber();
        }
        return RedissonPromise.newSucceededFuture(null);
    }

    private RFuture<Void> removeSubscriber() {
        this.readFuture.cancel(false);
        this.timeoutTask.cancel();
        String id = this.subscriberId.getAndSet(null);
        return this.commandExecutor.evalWriteAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "redis.call('zrem', KEYS[3], ARGV[1]); redis.call('zrem', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[2], ARGV[1]); ", Arrays.asList(this.getSubscribersName(), this.getMapName(), this.getTimeout()), id);
    }

    @Override
    public int countSubscribers() {
        return this.get(this.countSubscribersAsync());
    }

    @Override
    public RFuture<Integer> countSubscribersAsync() {
        return this.commandExecutor.readAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.ZCARD_INT, this.getSubscribersName());
    }

    private void renewExpiration() {
        this.timeoutTask = this.commandExecutor.getConnectionManager().newTimeout(t -> {
            RFuture future = this.commandExecutor.evalWriteAsync(this.getName(), (Codec)StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('zscore', KEYS[1], ARGV[2]) == false then return 0; end; redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]); return 1; ", Arrays.asList(this.getTimeout()), System.currentTimeMillis() + this.commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout(), this.subscriberId.get());
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update reliable topic " + this.getName() + " expiration time", e);
                    return;
                }
                if (res.booleanValue()) {
                    this.renewExpiration();
                }
            });
        }, this.commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout() / 3L, TimeUnit.MILLISECONDS);
    }

    private static class Entry {
        private final Class<?> type;
        private final MessageListener<?> listener;

        Entry(Class<?> type, MessageListener<?> listener) {
            this.type = type;
            this.listener = listener;
        }

        public Class<?> getType() {
            return this.type;
        }

        public MessageListener<?> getListener() {
            return this.listener;
        }
    }
}

