/*
 * Decompiled with CFR 0.152.
 */
package com.techempower.gemini.cluster.server;

import com.techempower.TechEmpowerApplication;
import com.techempower.asynchronous.Asynchronous;
import com.techempower.gemini.cluster.ClusterConstants;
import com.techempower.gemini.cluster.Message;
import com.techempower.gemini.cluster.MessageProcessor;
import com.techempower.gemini.cluster.RequestMessage;
import com.techempower.gemini.cluster.ResponseMessage;
import com.techempower.gemini.cluster.message.CachedRelationMessage;
import com.techempower.gemini.cluster.message.LogNoteBroadcast;
import com.techempower.gemini.cluster.message.TestBroadcast;
import com.techempower.gemini.cluster.message.UnavailableResponse;
import com.techempower.gemini.cluster.server.ClusterHandler;
import com.techempower.gemini.cluster.server.ConfigurationException;
import com.techempower.gemini.cluster.server.MasterUtilities;
import com.techempower.gemini.cluster.server.ServerConnection;
import com.techempower.gemini.cluster.server.ServerTransport;
import com.techempower.gemini.cluster.server.handler.AdminHandler;
import com.techempower.gemini.cluster.server.handler.AuthenticationHandler;
import com.techempower.gemini.cluster.server.handler.CacheMessageHandler;
import com.techempower.gemini.cluster.server.handler.ClusterStatusHandler;
import com.techempower.gemini.cluster.server.handler.ConfigurationHandler;
import com.techempower.gemini.cluster.server.handler.RelayHandler;
import com.techempower.helper.StringHelper;
import com.techempower.thread.EndableThread;
import com.techempower.util.EnhancedProperties;
import java.io.File;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Master
extends MessageProcessor
implements Asynchronous {
    public static final String PROPS_PREFIX = "Master.";
    public static final long PROCESSING_TIME_WARNING_THRESHOLD = 1000L;
    public static final int PERIODIC_MESSAGE_COUNT = 1000;
    private final TechEmpowerApplication application;
    private final Channels channels;
    private final Map<Class<? extends Message>, ClusterHandler<? extends Message>> handlers;
    private final ExecutorService threadPool;
    private final AtomicInteger nodeCounter = new AtomicInteger();
    private final OutboundConsumer outboundConsumer;
    private boolean displayClientDescription;
    private ServerTransport transport;
    private File dataDirectory;
    private Runnable nodeIdPersister = new Runnable(){

        @Override
        public void run() {
            MasterUtilities.writeNodeCountFile(Master.this.dataDirectory, Master.this);
        }
    };

    public Master(TechEmpowerApplication application) {
        super(application);
        this.application = application;
        this.handlers = new HashMap<Class<? extends Message>, ClusterHandler<? extends Message>>();
        this.nodeCounter.set(0);
        this.threadPool = Executors.newCachedThreadPool();
        this.channels = new Channels();
        this.outboundConsumer = new OutboundConsumer();
        this.addHandlers();
    }

    protected void configure(EnhancedProperties props) throws ConfigurationException {
        this.application.getApplicationLog().configure(props, this.application.getVersion());
        String transportClassName = props.getProperty("Master.TransportClassname");
        this.createTransport(transportClassName);
        String dataDir = "data" + File.separator;
        dataDir = props.getProperty("Master.DataDirectory", dataDir);
        this.dataDirectory = new File(dataDir);
        this.dataDirectory.mkdirs();
        if (!this.dataDirectory.exists() || !this.dataDirectory.isDirectory()) {
            throw new ConfigurationException("Data directory (" + dataDir + ") not available.");
        }
        this.getLog().log("Data: " + this.dataDirectory.getAbsolutePath());
        int highestNodeId = MasterUtilities.readNodeCountFile(this.dataDirectory);
        this.getLog().log("Starting node IDs at " + (highestNodeId + 1) + ".");
        this.nodeCounter.set(highestNodeId);
        this.transport.configure(props);
        this.transport.setMaster(this);
        for (ClusterHandler<? extends Message> handler : this.handlers.values()) {
            handler.configure(props);
        }
        this.displayClientDescription = props.getYesNoProperty("Master.DisplayClientDescription", false);
    }

    protected void createTransport(String transportClassName) throws ConfigurationException {
        if (StringHelper.isNonEmpty(transportClassName)) {
            try {
                Class<?> transportClass = Class.forName(transportClassName);
                Constructor<?> constructor = transportClass.getConstructor(MessageProcessor.getTransportConstructorArguments());
                ServerTransport transportToLoad = (ServerTransport)constructor.newInstance(this.application);
                this.getLog().log("Loaded " + transportToLoad.getTransportName() + ".");
                this.transport = transportToLoad;
            }
            catch (ClassNotFoundException cnfexc) {
                throw new ConfigurationException("Could not find specified Transport class.", cnfexc);
            }
            catch (NoSuchMethodException nsmexc) {
                throw new ConfigurationException("Transport does not provide a proper constructor.", nsmexc);
            }
            catch (NoClassDefFoundError ncdfer) {
                throw new ConfigurationException("Problem loading Transport class.", ncdfer);
            }
            catch (Exception exc) {
                throw new ConfigurationException(exc);
            }
        } else {
            throw new ConfigurationException("Master.TransportClassname must be specified in configuration file.");
        }
    }

    protected void addHandlers() {
        this.addHandler(new AuthenticationHandler());
        this.addHandler(new CacheMessageHandler(this.application));
        this.addHandler(new RelayHandler<CachedRelationMessage>(CachedRelationMessage.class));
        this.addHandler(new RelayHandler<TestBroadcast>(TestBroadcast.class));
        this.addHandler(new RelayHandler<LogNoteBroadcast>(LogNoteBroadcast.class));
        this.addHandler(new ClusterStatusHandler(this.application));
        this.addHandler(new ConfigurationHandler(this.application));
        this.addHandler(new AdminHandler());
    }

    public <M extends Message> void addHandler(ClusterHandler<M> handler) {
        this.handlers.put(handler.getMessageClass(), handler);
    }

    public Channels getChannels() {
        return this.channels;
    }

    public int getHighestAssignedNodeId() {
        return this.nodeCounter.get();
    }

    protected Executor getExecutor() {
        return this.threadPool;
    }

    protected void persistHighestAssignedNodeId() {
        this.getExecutor().execute(this.nodeIdPersister);
    }

    public <M extends Message> void process(ServerConnection client, M message) {
        long count = client.incrementMessageCount();
        if (client.getNode() == 0) {
            if (message.getSourceNode() == 0 || message.getSourceNode() > this.nodeCounter.get()) {
                int newNodeId = this.nodeCounter.incrementAndGet();
                this.persistHighestAssignedNodeId();
                client.setNode(newNodeId);
                this.getLog().log("New connection: " + client);
            } else {
                client.setNode(message.getSourceNode());
                this.getLog().log("Node " + message.getSourceNode() + " reconnected.");
            }
        }
        if (count % 1000L == 0L) {
            this.getLog().log("Received " + count + " from node " + client.getNode() + " (connected " + client.getConnectionUptime() + "ms).", 10);
        }
        if (message instanceof ResponseMessage && this.wakeup((ResponseMessage)message)) {
            return;
        }
        ClusterHandler<?> handler = this.getHandler(message.getClass());
        if (handler != null) {
            if (!handler.requiresAuthentication() || client.isAuthenticated()) {
                this.invokeHandler(handler, client, message, this);
            } else {
                this.getLog().log("Rejected unauthenicated request for " + handler.getMessageClass().getName());
            }
        } else if (message instanceof RequestMessage) {
            this.sendResponse(client, (RequestMessage)message, new UnavailableResponse());
        }
    }

    protected <M extends Message> void invokeHandler(final ClusterHandler<M> handler, final ServerConnection client, final M message, final Master master) {
        if (handler.requiresSeparateThread()) {
            Runnable handle = new Runnable(){

                @Override
                public void run() {
                    try {
                        handler.handle(client, message, master);
                    }
                    catch (Exception exc) {
                        Master.this.getLog().log("Exception while handling message within Executor: " + message, exc);
                    }
                }
            };
            this.getExecutor().execute(handle);
        } else {
            long startTime = System.currentTimeMillis();
            try {
                handler.handle(client, message, master);
            }
            catch (Exception exc) {
                this.getLog().log("Exception while handling message: " + message, exc);
            }
            long processingTime = System.currentTimeMillis() - startTime;
            if (processingTime > 1000L) {
                this.getLog().log("Processing of " + message + " required " + processingTime + "ms; but " + handler.getClass().getName() + " does not specify that a separate thread should be used!", 70);
            }
        }
    }

    public void disconnect(ServerConnection client) {
        this.getChannels().remove(client);
        this.getLog().log("Node " + client.getNode() + " disconnected (connected " + client.getConnectionUptime() + "ms; sent " + client.getMessageCount() + " messages)" + (this.displayClientDescription ? " [" + client.getClientDescription() + "]" : "") + ".");
    }

    public void sendResponse(ServerConnection client, RequestMessage request, ResponseMessage response) {
        if (request.getSourceNode() != 0 && client.getNode() != request.getSourceNode()) {
            this.getLog().log("Attempt to send response to wrong client.", 70);
            return;
        }
        response.setExclusiveConnection(client);
        this.sendResponse(request, response);
    }

    public void send(ServerConnection clientToOmit, Message message) {
        message.setOmittedConnection(clientToOmit);
        this.markOutboundMessage(message);
        this.addToOutboundQueue(message);
    }

    protected boolean processOutboundQueue() {
        try {
            Message message = this.getOutbound().peek();
            boolean messageSent = false;
            while (message != null) {
                message = this.getOutbound().poll();
                Set<ServerConnection> destinations = this.getChannels().get(message.getChannel());
                if (message instanceof RequestMessage) {
                    if (destinations != null && destinations.size() > 0) {
                        RequestMessage rm = (RequestMessage)message;
                        rm.setExpectedResponseCount(destinations.size());
                        this.transport.send(rm, destinations);
                        messageSent = true;
                    } else {
                        this.wakeupNoResponse(message.getMessageId());
                    }
                } else {
                    this.transport.send(message, destinations);
                }
                message = this.getOutbound().peek();
            }
            return messageSent;
        }
        catch (Exception exc) {
            this.getLog().log("Exception while processing outbound queue", exc);
            return false;
        }
    }

    public Iterator<ServerConnection> getConnections() {
        if (this.transport != null) {
            return this.transport.getConnections();
        }
        return null;
    }

    protected <M extends Message> ClusterHandler<M> getHandler(Class<M> clazz) {
        return this.handlers.get(clazz);
    }

    @Override
    public void begin() {
        this.getLog().log("Master server starting.");
        this.transport.begin();
        this.outboundConsumer.begin();
    }

    @Override
    public void end() {
        this.getLog().log("Master server stopping.");
        this.transport.end();
        this.outboundConsumer.end();
    }

    public class Channels {
        private ConcurrentHashMap<Integer, HashSet<ServerConnection>> map = new ConcurrentHashMap();

        public void add(ServerConnection connection) {
            HashSet<ServerConnection> connections = this.map.get(connection.getChannel());
            connections = connections == null ? new HashSet(1) : new HashSet<ServerConnection>(connections);
            connections.add(connection);
            this.map.put(connection.getChannel(), connections);
            Master.this.getLog().log("Connection " + connection.getNode() + " added to channel " + connection.getChannel() + " (" + ClusterConstants.channelName(connection.getChannel()) + ")" + (Master.this.displayClientDescription ? " [" + connection.getClientDescription() + "]" : "") + ".");
        }

        public void remove(ServerConnection connection) {
            HashSet<ServerConnection> connections = this.map.get(connection.getChannel());
            if (connections != null) {
                connections = new HashSet<ServerConnection>(connections);
                connections.remove(connection);
                this.map.put(connection.getChannel(), connections);
            }
        }

        public Set<ServerConnection> get(int channel) {
            return this.map.get(channel);
        }
    }

    class OutboundConsumer
    extends EndableThread {
        public OutboundConsumer() {
            super("Cluster master outbound message consumer", 25, 250, 5, 5);
        }

        @Override
        public void run() {
            while (this.checkPause()) {
                try {
                    if (Master.this.processOutboundQueue()) {
                        this.setMinimumSleep();
                    } else {
                        this.incrementSleep();
                    }
                    this.simpleSleep();
                }
                catch (Exception exc) {
                    Master.this.getLog().log("Outbound Consumer exception.", exc);
                }
            }
            Master.this.getLog().log("Outbound message consumer stopping.");
        }
    }
}

