/*
 * Decompiled with CFR 0.152.
 */
package com.techempower.gemini.cluster.client;

import com.techempower.asynchronous.Asynchronous;
import com.techempower.gemini.GeminiApplication;
import com.techempower.gemini.GeminiHelper;
import com.techempower.gemini.InitConfig;
import com.techempower.gemini.cluster.Message;
import com.techempower.gemini.cluster.MessageProcessor;
import com.techempower.gemini.cluster.MessageQueue;
import com.techempower.gemini.cluster.RequestMessage;
import com.techempower.gemini.cluster.ResponseMessage;
import com.techempower.gemini.cluster.client.ClientTransport;
import com.techempower.gemini.cluster.client.ClusterClientCallback;
import com.techempower.gemini.cluster.client.MessageHandler;
import com.techempower.gemini.cluster.client.handler.CacheHandler;
import com.techempower.gemini.cluster.client.handler.CachedRelationHandler;
import com.techempower.gemini.cluster.message.AuthenticationRequest;
import com.techempower.gemini.cluster.message.AuthenticationResponse;
import com.techempower.helper.StringHelper;
import com.techempower.thread.EndableThread;
import com.techempower.util.Configurable;
import com.techempower.util.EnhancedProperties;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ClusterClient
extends MessageProcessor
implements Configurable,
ClusterClientCallback,
Asynchronous {
    public static final String PROPS_PREFIX = "ClusterClient.";
    public static final long PROCESSING_TIME_WARNING_THRESHOLD = 1000L;
    private final GeminiApplication application;
    private final MessageQueue inbound;
    private final Map<Class<? extends Message>, MessageHandler<? extends Message>> handlers;
    private final ExecutorService threadPool;
    private ClientTransport transport;
    private boolean connectedAuthenticated = false;
    private OutboundConsumer outboundConsumer;
    private InboundConsumer inboundConsumer;
    private int channel = 1;
    private boolean enabled = true;

    public ClusterClient(GeminiApplication application) {
        super(application);
        this.application = application;
        this.handlers = new HashMap<Class<? extends Message>, MessageHandler<? extends Message>>();
        this.inbound = new MessageQueue();
        this.threadPool = Executors.newCachedThreadPool();
        application.getConfigurator().addConfigurable(this);
        application.addAsynchronous(this);
    }

    protected void addHandlers() {
        this.addHandler(new CacheHandler(this.application));
        this.addHandler(new CachedRelationHandler(this.application));
    }

    public <M extends Message> void addHandler(MessageHandler<M> handler) {
        this.handlers.put(handler.getMessageClass(), handler);
    }

    @Override
    public void begin() {
        if (this.isEnabled()) {
            if (this.inboundConsumer == null || this.inboundConsumer.getState() == Thread.State.TERMINATED) {
                this.getLog().log("Starting inbound consumer thread.", 10);
                this.inboundConsumer = new InboundConsumer();
                this.inboundConsumer.begin();
            }
            if (this.outboundConsumer == null || this.outboundConsumer.getState() == Thread.State.TERMINATED) {
                this.getLog().log("Starting outbound consumer thread.", 10);
                this.outboundConsumer = new OutboundConsumer();
                this.outboundConsumer.begin();
            }
        } else {
            this.getLog().log("Not starting ClusterClient threads because no valid Transport is available.", 70);
        }
    }

    @Override
    public void end() {
        if (this.inboundConsumer != null) {
            this.inboundConsumer.end();
        }
        if (this.outboundConsumer != null) {
            this.outboundConsumer.end();
        }
    }

    protected Executor getExecutor() {
        return this.threadPool;
    }

    public int getChannel() {
        return this.channel;
    }

    public void setChannel(int channel) {
        this.channel = channel;
    }

    @Override
    public void process(Message message) {
        this.addToInboundQueue(message);
    }

    protected void addToInboundQueue(Message message) {
        this.inbound.addDirectly(message);
    }

    protected void processInboundQueue() {
        Message message = this.inbound.poll();
        while (message != null) {
            this.processInboundFromQueue(message);
            message = this.inbound.poll();
        }
    }

    protected <M extends Message> void processInboundFromQueue(M message) {
        if (message instanceof ResponseMessage && this.wakeup((ResponseMessage)message)) {
            return;
        }
        MessageHandler<? extends Message> handler = this.handlers.get(message.getClass());
        if (handler != null) {
            this.invokeHandler(handler, message);
        } else {
            this.getLog().log("Unable to find MessageHandler for " + message.getClass().getName(), 10);
        }
    }

    protected <M extends Message> void invokeHandler(final MessageHandler<M> handler, final M message) {
        if (handler.requiresSeparateThread()) {
            Runnable handle = new Runnable(){

                @Override
                public void run() {
                    try {
                        handler.handle(message);
                    }
                    catch (Exception exc) {
                        ClusterClient.this.getLog().log("Exception while handling message within Executor: " + message, exc);
                    }
                }
            };
            this.getExecutor().execute(handle);
        } else {
            long startTime = System.currentTimeMillis();
            try {
                handler.handle(message);
            }
            catch (Exception exc) {
                this.getLog().log("Exception while handling message: " + message, exc);
            }
            long processingTime = System.currentTimeMillis() - startTime;
            if (processingTime > 1000L) {
                this.getLog().log("Processing of " + message + " required " + processingTime + "ms; but " + handler.getClass().getName() + " does not specify that a separate thread should be used!", 70);
            }
        }
    }

    public ResponseMessage sendRequest(RequestMessage request) {
        List<ResponseMessage> responses = this.send(request);
        if (responses.size() >= 1) {
            return responses.get(0);
        }
        return null;
    }

    protected boolean connectIfNeeded() {
        boolean toReturn = true;
        if (!this.transport.isConnected()) {
            this.connectedAuthenticated = false;
            this.transport.setCallback(null);
            try {
                this.transport.connect();
            }
            catch (Exception exc) {
                this.getLog().log("Exception while attempting to connect.", exc);
                toReturn = false;
            }
        }
        if (!this.connectedAuthenticated) {
            try {
                Authenticator authenticator = new Authenticator();
                toReturn = authenticator.authenticate();
            }
            catch (Exception exc) {
                this.getLog().log("Exception while attempting to authenticate.", exc);
                toReturn = false;
            }
        }
        return toReturn;
    }

    protected boolean clearToSend() {
        return this.transport.isConnected() && this.connectedAuthenticated;
    }

    protected void processOutboundQueue() {
        try {
            if (this.clearToSend()) {
                Message message = this.getOutbound().peek();
                while (this.clearToSend() && message != null) {
                    message = this.getOutbound().poll();
                    message.setSourceNode(this.getNodeId());
                    this.transport.send(message);
                    message = this.getOutbound().peek();
                }
            }
        }
        catch (Exception exc) {
            this.getLog().log("Exception while processing outbound queue.", exc);
        }
    }

    @Override
    public void configure(EnhancedProperties props) {
        String transportClassName = props.getProperty("ClusterClient.TransportClassname");
        this.createTransport(transportClassName);
        this.setAuthenticationKey(props.getProperty("ClusterClient.Authentication.Key", this.getAuthenticationKey()));
        this.setAuthenticationTimeout(props.getLongProperty("ClusterClient.Authentication.Timeout", 10000L));
        if (this.transport != null) {
            this.transport.configure(props);
        }
        this.enabled = props.getYesNoProperty("ClusterClient.Enabled", this.enabled);
        for (MessageHandler<? extends Message> handler : this.handlers.values()) {
            handler.configure(props);
        }
    }

    public void configure(InitConfig config) {
        if (config != null) {
            EnhancedProperties converted = GeminiHelper.convertServletConfigToProps(config);
            this.configure(converted);
        }
    }

    protected void createTransport(String transportClassName) {
        if (this.transport == null && StringHelper.isNonEmpty(transportClassName)) {
            try {
                Class<?> transportClass = Class.forName(transportClassName);
                Constructor<?> constructor = transportClass.getConstructor(MessageProcessor.getTransportConstructorArguments());
                ClientTransport newTransport = (ClientTransport)constructor.newInstance(this.application);
                this.getLog().log("Loaded " + newTransport.getTransportName() + ".");
                this.transport = newTransport;
            }
            catch (ClassNotFoundException cnfexc) {
                this.getLog().log("Could not find specified Transport class: " + cnfexc, 70);
            }
            catch (NoSuchMethodException nsmexc) {
                this.getLog().log("Transport does not provide a proper constructor: " + nsmexc, 70);
            }
            catch (NoClassDefFoundError ncdfer) {
                this.getLog().log("Problem loading Transport class: " + ncdfer, 70);
            }
            catch (Exception exc) {
                this.getLog().log("Unable to create Transport.", 70, exc);
            }
        }
    }

    @Override
    public boolean isEnabled() {
        return this.enabled && this.transport != null;
    }

    class Authenticator
    implements ClusterClientCallback {
        private Object monitor = new Object();
        private AuthenticationResponse response = null;

        Authenticator() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public boolean authenticate() {
            ClusterClient cc = ClusterClient.this;
            ClientTransport theTransport = cc.transport;
            try {
                theTransport.setCallback(this);
                AuthenticationRequest request = new AuthenticationRequest(ClusterClient.this.getAuthenticationTimeout());
                request.setChannel(ClusterClient.this.getChannel());
                ClusterClient.this.markOutboundMessage(request);
                request.setDeploymentDescription(ClusterClient.this.application.getVersion().getDeploymentDescription());
                request.setKey(cc.getAuthenticationKey());
                request.captureSentTime();
                theTransport.send(request);
                while (true) {
                    if (this.response != null || request.getRequestTimeoutRemaining() <= 0L) {
                        if (this.response == null) {
                            cc.getLog().log("Cluster master did not respond to authentication request within timeout of " + request.getRequestTimeout() + "ms");
                            return false;
                        }
                        if (!this.response.isGood()) break;
                        cc.getLog().log("Successfully authenticated with cluster master as node " + this.response.getNodeId() + ".");
                        cc.connectedAuthenticated = true;
                        cc.setNodeId(this.response.getNodeId());
                        theTransport.setCallback(cc);
                        return true;
                    }
                    try {
                        Object object = this.monitor;
                        synchronized (object) {
                            long timeout = request.getRequestTimeoutRemaining();
                            if (timeout >= 1L) {
                                this.monitor.wait(timeout);
                            }
                            continue;
                        }
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                        continue;
                    }
                    break;
                }
                cc.getLog().log("Cluster master responded that authentication key is bad.");
                return false;
            }
            catch (Exception exc) {
                cc.getLog().log("Exception while attempting to connect or authenticate.", exc);
            }
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void process(Message message) {
            if (message instanceof AuthenticationResponse) {
                this.response = (AuthenticationResponse)message;
                Object object = this.monitor;
                synchronized (object) {
                    this.monitor.notifyAll();
                }
            }
        }
    }

    class InboundConsumer
    extends EndableThread {
        public static final int SLEEP_MS = 50;

        public InboundConsumer() {
            super("Cluster inbound message consumer " + ClusterClient.this.application.getVersion().getNameAndDeployment(), 50);
        }

        @Override
        public void run() {
            while (this.checkPause()) {
                try {
                    ClusterClient.this.processInboundQueue();
                    this.simpleSleep();
                }
                catch (Exception exc) {
                    ClusterClient.this.getLog().log("Inbound Consumer exception.", exc);
                }
            }
        }
    }

    class OutboundConsumer
    extends EndableThread {
        public static final int SLEEP_MS = 50;
        public static final int MAXIMUM_SLEEP_MS = 3000;
        public static final int MINIMUM_SLEEP_MS = 50;
        public static final int SLEEP_ADJUSTMENT_MS = 500;

        public OutboundConsumer() {
            super("Cluster outbound message consumer " + ClusterClient.this.application.getVersion().getNameAndDeployment(), 50, 3000, 50, 500);
        }

        @Override
        public void run() {
            ClientTransport theTransport = ClusterClient.this.transport;
            while (this.checkPause()) {
                try {
                    if (ClusterClient.this.connectIfNeeded()) {
                        this.setMinimumSleep();
                    } else {
                        this.incrementSleep();
                    }
                    ClusterClient.this.processOutboundQueue();
                    this.simpleSleep();
                }
                catch (Exception exc) {
                    ClusterClient.this.getLog().log("Outbound Consumer exception.", exc);
                }
            }
            theTransport.setCallback(null);
            long forceShutdown = System.currentTimeMillis() + 2000L;
            while (System.currentTimeMillis() < forceShutdown) {
                ClusterClient.this.processOutboundQueue();
                this.simpleSleep(50);
            }
            theTransport.disconnect();
        }
    }
}

