package org.apache.storm.utils;

import com.google.common.base.Strings;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/utils/TopologySpoutLag.class */
public class TopologySpoutLag {
    private static final String SPOUT_ID = "spoutId";
    private static final String SPOUT_TYPE = "spoutType";
    private static final String SPOUT_LAG_RESULT = "spoutLagResult";
    private static final String ERROR_INFO = "errorInfo";
    private static final String CONFIG_KEY_PREFIX = "config.";
    private static final String TOPICS_CONFIG = "config.topics";
    private static final String GROUPID_CONFIG = "config.groupid";
    private static final String BOOTSTRAP_CONFIG = "config.bootstrap.servers";
    private static final String SECURITY_PROTOCOL_CONFIG = "config.security.protocol";
    private static final Set<String> ALL_CONFIGS = new HashSet(Arrays.asList(TOPICS_CONFIG, GROUPID_CONFIG, BOOTSTRAP_CONFIG, SECURITY_PROTOCOL_CONFIG));
    private static final Logger LOGGER = LoggerFactory.getLogger(TopologySpoutLag.class);

    public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : stormTopology.get_spouts().entrySet()) {
            try {
                addLagResultForKafkaSpout(hashMap, (String) entry.getKey(), (SpoutSpec) entry.getValue());
            } catch (Exception e) {
                LOGGER.warn("Exception thrown while getting lag for spout id: " + ((String) entry.getKey()));
                LOGGER.warn("Exception message:" + e.getMessage(), e);
            }
        }
        return hashMap;
    }

    private static List<String> getCommandLineOptionsForNewKafkaSpout(Map<String, Object> map) {
        LOGGER.debug("json configuration: {}", map);
        ArrayList arrayList = new ArrayList();
        arrayList.add("-t");
        arrayList.add((String) map.get(TOPICS_CONFIG));
        arrayList.add("-g");
        arrayList.add((String) map.get(GROUPID_CONFIG));
        arrayList.add("-b");
        arrayList.add((String) map.get(BOOTSTRAP_CONFIG));
        String str = (String) map.get(SECURITY_PROTOCOL_CONFIG);
        if (!Strings.isNullOrEmpty(str)) {
            arrayList.add("-s");
            arrayList.add(str);
        }
        return arrayList;
    }

    private static File createExtraPropertiesFile(Map<String, Object> map) {
        File file = null;
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            if (entry.getKey().startsWith(CONFIG_KEY_PREFIX) && !ALL_CONFIGS.contains(entry.getKey())) {
                hashMap.put(entry.getKey().substring(CONFIG_KEY_PREFIX.length()), entry.getValue().toString());
            }
        }
        if (!hashMap.isEmpty()) {
            try {
                file = File.createTempFile("kafka-consumer-extra", "props");
                file.deleteOnExit();
                Properties properties = new Properties();
                properties.putAll(hashMap);
                FileOutputStream fileOutputStream = new FileOutputStream(file);
                Throwable th = null;
                try {
                    try {
                        properties.store(fileOutputStream, "Kafka consumer extra properties");
                        if (fileOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    fileOutputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                fileOutputStream.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
            }
        }
        return file;
    }

    private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>> map, String str, SpoutSpec spoutSpec) throws IOException {
        String str2 = spoutSpec.get_common().get_json_conf();
        if (Strings.isNullOrEmpty(str2)) {
            return;
        }
        try {
            Map map2 = (Map) JSONValue.parseWithException(str2);
            if (map2.containsKey(TOPICS_CONFIG) && map2.containsKey(GROUPID_CONFIG) && map2.containsKey(BOOTSTRAP_CONFIG)) {
                map.put(str, getLagResultForNewKafkaSpout(str, spoutSpec));
            }
        } catch (ParseException e) {
            throw new IOException((Throwable) e);
        }
    }

    private static Map<String, Object> getLagResultForKafka(String str, SpoutSpec spoutSpec) throws IOException {
        String str2 = spoutSpec.get_common().get_json_conf();
        Map map = null;
        String str3 = "Make sure Kafka spout version is latest and " + TOPICS_CONFIG + ", " + GROUPID_CONFIG + " & " + BOOTSTRAP_CONFIG + " are not null for newer versions of Kafka spout.";
        if (!Strings.isNullOrEmpty(str2)) {
            ArrayList arrayList = new ArrayList();
            String str4 = System.getenv("STORM_BASE_DIR");
            if (str4 != null && !str4.endsWith("/")) {
                str4 = str4 + File.separator;
            }
            arrayList.add(str4 != null ? str4 + "bin" + File.separator + "storm-kafka-monitor" : "storm-kafka-monitor");
            try {
                Map map2 = (Map) JSONValue.parseWithException(str2);
                arrayList.addAll(getCommandLineOptionsForNewKafkaSpout(map2));
                File createExtraPropertiesFile = createExtraPropertiesFile(map2);
                if (createExtraPropertiesFile != null) {
                    arrayList.add("-c");
                    arrayList.add(createExtraPropertiesFile.getAbsolutePath());
                }
                LOGGER.debug("Command to run: {}", arrayList);
                if (!arrayList.contains(null)) {
                    try {
                        String execCommand = new ShellCommandRunnerImpl().execCommand((String[]) arrayList.toArray(new String[0]));
                        try {
                            map = (Map) JSONValue.parseWithException(execCommand);
                        } catch (ParseException e) {
                            LOGGER.debug("JSON parsing failed, assuming message as error message: {}", execCommand);
                            str3 = execCommand;
                        }
                    } finally {
                        if (createExtraPropertiesFile != null) {
                            createExtraPropertiesFile.delete();
                        }
                    }
                }
            } catch (ParseException e2) {
                throw new IOException((Throwable) e2);
            }
        }
        HashMap hashMap = new HashMap();
        hashMap.put(SPOUT_ID, str);
        hashMap.put(SPOUT_TYPE, "KAFKA");
        if (map != null) {
            hashMap.put(SPOUT_LAG_RESULT, map);
        } else {
            hashMap.put(ERROR_INFO, str3);
        }
        return hashMap;
    }

    private static Map<String, Object> getLagResultForNewKafkaSpout(String str, SpoutSpec spoutSpec) throws IOException {
        return getLagResultForKafka(str, spoutSpec);
    }
}
