package com.mayam.wf.mq.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.assistedinject.Assisted;
import com.mayam.wf.attributes.annotations.DecConnectionCount;
import com.mayam.wf.attributes.annotations.IncConnectionCount;
import com.mayam.wf.attributes.shared.Attribute;
import com.mayam.wf.attributes.shared.AttributeMap;
import com.mayam.wf.mq.AttributeMessageBuilder;
import com.mayam.wf.mq.Mq;
import com.mayam.wf.mq.MqContentType;
import com.mayam.wf.mq.MqDestination;
import com.mayam.wf.mq.MqException;
import com.mayam.wf.mq.MqMessage;
import com.mayam.wf.mq.RetryMessageException;
import com.mayam.wf.mq.property.MessageType;
import com.mayam.wf.mq.replay.ReplayManager;
import com.mayam.wf.mq.replay.ReplayStore;
import com.mayam.wf.siteconfig.MqConfig;
import com.rabbitmq.client.ConnectionFactory;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.ByteSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mayam/wf/mq/impl/BaseJmsMq.class */
public abstract class BaseJmsMq implements Mq {
    private static final String TOPIC_MAYAM_PING = "topic://mayam.ping";
    protected static final int MAX_RETRIES = 3;
    private static final long INTENSE_TIMEOUT = 500;
    private static final long INTENSE_TIMEOUT_TRIGGERED = 1;
    private static final long NORMAL_TIMEOUT = 1000;
    private static final long RELAXED_TIMEOUT = 5000;
    private static final long SLEEP_INCREMENT = 200;
    private static final long LISTEN_BASE_FAIL_SLEEP = 100;
    private static final long LISTEN_MAX_FAIL_SLEEP = 5000;
    private final MqConfig mqConfig;
    protected final ReplayManager replayManager;
    protected final Provider<MqMessage> messageProvider;
    private final Provider<AttributeMap> mapProvider;
    private final Provider<AttributeMessageBuilder> msgBuilderProvider;
    private final ThreadLocal<Local> local = new ThreadLocal<Local>() { // from class: com.mayam.wf.mq.impl.BaseJmsMq.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public Local initialValue() {
            return new Local();
        }
    };
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) BaseJmsMq.class);
    private static long SEND_FAIL_TIMEOUT = ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/mayam/wf/mq/impl/BaseJmsMq$ListenerContainer.class */
    public class ListenerContainer implements Mq.Detachable {
        protected String destination;
        protected Mq.Listener callback;
        protected MessageConsumer consumer;

        protected ListenerContainer(String str, Mq.Listener listener) {
            this.destination = str;
            this.callback = listener;
        }

        protected MessageConsumer consumer() throws JMSException {
            if (this.consumer == null) {
                this.consumer = BaseJmsMq.this.createConsumer(this.destination);
                BaseJmsMq.logger.debug("Consumer created for " + this.destination);
            }
            return this.consumer;
        }

        protected void shutdownConsumer() {
            if (this.consumer == null) {
                return;
            }
            try {
                this.consumer.close();
            } catch (Throwable th) {
            }
            this.consumer = null;
        }

        @Override // com.mayam.wf.mq.Mq.Detachable
        public void detach() {
            shutdownConsumer();
            BaseJmsMq.this.local.get().listeners.remove(this);
        }

        private void ack(Message message) {
            try {
                message.acknowledge();
            } catch (JMSException e) {
                BaseJmsMq.logger.warn("Failed to acknowledge message to " + this.destination);
            }
        }

        protected void handle(Message message, String str) {
            if (message == null) {
                return;
            }
            try {
                MqMessage createMqMessage = BaseJmsMq.this.createMqMessage(message);
                try {
                    String str2 = createMqMessage.get(MqMessage.PROP_MESSAGE_URL);
                    if (str2 != null) {
                        createMqMessage = BaseJmsMq.this.retrieveFullMessage(new URL(str2));
                    }
                    this.callback.onMessage(createMqMessage);
                    ack(message);
                } catch (Throwable th) {
                    try {
                        BaseJmsMq.this.replayManager.getHandleFailStore(new Date()).store(new ReplayStore.EntryBuilder(str).message(createMqMessage).cause(th).build());
                        if (th instanceof RetryMessageException) {
                            BaseJmsMq.logger.error("Failed to process message {} due to {}/{}. Replay is availble (message not acknowledged)", createMqMessage, th, th.getMessage());
                            BaseJmsMq.logger.error("StackTrace:", th);
                        } else {
                            BaseJmsMq.logger.error("Failed to process message {} due to {}. Replay is availble (message acknowledged)", createMqMessage, th);
                            BaseJmsMq.logger.error("StackTrace:", th);
                            ack(message);
                        }
                    } catch (Throwable th2) {
                        BaseJmsMq.logger.error("Failed to process message {} due to {}. Additionally, replay was made impossible due to {} (mesage not acknowledged)", createMqMessage, th, th2);
                    }
                }
            } catch (MqException e) {
                BaseJmsMq.logger.error("Failed to convert incoming Message {} to MqMessage due to {}. Unable to create replay.", message, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mayam/wf/mq/impl/BaseJmsMq$Local.class */
    public static class Local {
        Connection producerConnection;
        Connection consumerConnection;
        Session producerSession;
        Session consumerSession;
        Map<String, MessageProducer> producers = new HashMap();
        List<ListenerContainer> listeners = new ArrayList();
        boolean consumedLastIteration = false;
        int consecutiveFailures = 0;

        private Local() {
        }
    }

    @Inject
    public BaseJmsMq(ReplayManager replayManager, Provider<MqMessage> provider, Provider<AttributeMap> provider2, Provider<AttributeMessageBuilder> provider3, @Assisted MqConfig mqConfig) {
        this.replayManager = replayManager;
        this.messageProvider = provider;
        this.mapProvider = provider2;
        this.msgBuilderProvider = provider3;
        this.mqConfig = mqConfig;
    }

    @VisibleForTesting
    public static void enableFastSendTimeouts() {
        SEND_FAIL_TIMEOUT = LISTEN_BASE_FAIL_SLEEP;
    }

    @IncConnectionCount("ProducerMQ")
    protected Connection producerConnection() throws JMSException {
        Local local = this.local.get();
        if (local.producerConnection != null && isClosedConnection(local.producerConnection)) {
            logger.error("Closed producer connection detected; forcing cleanup and recovery");
            performProducerShutdown();
        }
        if (local.producerConnection == null) {
            logger.debug("New producer connection");
            local.producerConnection = createConnection(this.mqConfig.getMqUserName(), this.mqConfig.getMqPassword(), this.mqConfig.getMqBrokerUrl());
            local.producerConnection.start();
            if (local.producerConnection instanceof ActiveMQConnection) {
                logger.info("ActiveMQ: created producerConnection {} / {}", ((ActiveMQConnection) local.producerConnection).getTransport(), local.producerConnection);
            } else {
                logger.info("MQ: created producerConnection {}", local.producerConnection);
            }
        }
        return local.producerConnection;
    }

    protected abstract boolean isClosedConnection(Connection connection);

    /* JADX INFO: Access modifiers changed from: protected */
    public Session producerSession() throws JMSException {
        Local local = this.local.get();
        if (local.producerSession != null && isClosedSession(local.producerSession)) {
            logger.error("Closed producer session detected; forcing cleanup and recovery");
            performProducerShutdown();
        }
        if (local.producerSession == null) {
            logger.info("New producer session");
            local.producerSession = producerConnection().createSession(false, 1);
        }
        return local.producerSession;
    }

    protected abstract boolean isClosedSession(Session session);

    protected MessageProducer producer(String str) throws JMSException {
        Local local = this.local.get();
        MessageProducer messageProducer = local.producers.get(str);
        if (messageProducer == null) {
            logger.debug("New producer for {}", str);
            messageProducer = createProducer(str);
            local.producers.put(str, messageProducer);
        }
        return messageProducer;
    }

    @IncConnectionCount("ConsumerMQ")
    protected Connection consumerConnection() throws JMSException {
        Local local = this.local.get();
        if (local.consumerConnection == null) {
            logger.debug("New consumer connection");
            local.consumerConnection = createConnection(this.mqConfig.getMqUserName(), this.mqConfig.getMqPassword(), this.mqConfig.getMqBrokerUrl());
            local.consumerConnection.start();
            if (local.consumerConnection instanceof ActiveMQConnection) {
                logger.info("ActiveMQ: created consumerConnection {} / {}", ((ActiveMQConnection) local.consumerConnection).getTransport(), local.consumerConnection);
            } else {
                logger.info("MQ: created consumerConnection {}", local.consumerConnection);
            }
        }
        return local.consumerConnection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Session consumerSession() throws JMSException {
        Local local = this.local.get();
        if (local.consumerSession == null) {
            logger.info("New consumer session");
            local.consumerSession = consumerConnection().createSession(false, 2);
        }
        return local.consumerSession;
    }

    @Override // com.mayam.wf.mq.Mq
    public void shutdownProducers() {
        performProducerShutdown();
        tryRemoveLocal();
    }

    protected void performProducerShutdown() {
        Local local = this.local.get();
        boolean z = false;
        Iterator<MessageProducer> it = local.producers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
                z = true;
            } catch (Throwable th) {
                logger.error("ActiveMQ: Unable to close producer", th);
            }
        }
        if (local.producerSession != null) {
            try {
                local.producerSession.close();
                z = true;
            } catch (Throwable th2) {
                logger.error("ActiveMQ: Unable to close producerSession", th2);
            }
        }
        if (local.producerConnection != null) {
            try {
                if (local.producerConnection instanceof ActiveMQConnection) {
                    logger.info("ActiveMQ: about to close producerConnection {} / {}", ((ActiveMQConnection) local.producerConnection).getTransport(), local.producerConnection);
                } else {
                    logger.info("MQ: about to close producerConnection {}", local.producerConnection);
                }
                closeProducerConnection(local.producerConnection);
                z = true;
            } catch (Throwable th3) {
                logger.error("ActiveMQ: Unable to close producerConnection", th3);
            }
        }
        if (z) {
            logger.info("Did shut down producers");
        }
        local.producers.clear();
        local.producerSession = null;
        local.producerConnection = null;
    }

    @DecConnectionCount("ProducerMQ")
    protected void closeProducerConnection(Connection connection) throws JMSException {
        connection.close();
    }

    @DecConnectionCount("ConsumerMQ")
    private void closeConsumerConnection(Connection connection) throws JMSException {
        connection.close();
    }

    @Override // com.mayam.wf.mq.Mq
    public void shutdownConsumers() {
        performConsumerShutdown();
        tryRemoveLocal();
    }

    protected void performConsumerShutdown() {
        Local local = this.local.get();
        boolean z = false;
        Iterator<ListenerContainer> it = local.listeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().shutdownConsumer();
                z = true;
            } catch (Throwable th) {
                logger.error("ActiveMQ: Unable to close consumer", th);
            }
        }
        if (local.consumerSession != null) {
            try {
                local.consumerSession.close();
                z = true;
            } catch (Throwable th2) {
                logger.error("ActiveMQ: Unable to close consumerSession", th2);
            }
        }
        if (local.consumerConnection != null) {
            try {
                if (local.consumerConnection instanceof ActiveMQConnection) {
                    logger.info("ActiveMQ: about to close consumerConnection {} / {}", ((ActiveMQConnection) local.consumerConnection).getTransport(), local.consumerConnection);
                } else {
                    logger.info("MQ: about to close consumerConnection {}", local.consumerConnection);
                }
                closeConsumerConnection(local.consumerConnection);
                z = true;
            } catch (Throwable th3) {
                logger.error("ActiveMQ: Unable to close consumerConnection", th3);
            }
        }
        if (z) {
            logger.info("Did shut down consumers");
        }
        local.consumerSession = null;
        local.consumerConnection = null;
    }

    protected void tryRemoveLocal() {
        Local local = this.local.get();
        if (local.consumerConnection == null && local.producerConnection == null && local.listeners.isEmpty()) {
            this.local.remove();
        }
    }

    protected abstract Connection createConnection(String str, String str2, String str3) throws JMSException;

    protected abstract MessageProducer createProducer(String str) throws JMSException;

    protected abstract MessageConsumer createConsumer(String str) throws JMSException;

    protected TextMessage createTextMessage(MqDestination mqDestination, MqMessage mqMessage) throws MqException {
        try {
            TextMessage createTextMessage = producerSession().createTextMessage();
            if (mqMessage.getType() != null) {
                createTextMessage.setJMSType(mqMessage.getType().type());
            }
            if (mqMessage.get(MqMessage.PROP_ORIGIN_DESTINATION) == null) {
                mqMessage.set(MqMessage.PROP_ORIGIN_DESTINATION, mqDestination.url());
            }
            if (mqMessage.get(MqMessage.PROP_ORIGIN_DATE) == null) {
                mqMessage.set(MqMessage.PROP_ORIGIN_DATE, new Date());
            }
            if (mqMessage.isLarge()) {
                throw new MqException("Large messages doesn't work anymore. Please see MqMessage.java for possible override", null);
            }
            createTextMessage.setText(mqMessage.getContent());
            for (String str : mqMessage.propertyNames()) {
                createTextMessage.setStringProperty(str, mqMessage.get(str));
            }
            return createTextMessage;
        } catch (Throwable th) {
            throw new MqException("Failed to assemble message bound for " + mqDestination.url() + ". No replay available at this early stage.", th);
        }
    }

    protected BytesMessage createByteMessage(MqDestination mqDestination, MqMessage mqMessage) throws MqException {
        try {
            BytesMessage createBytesMessage = producerSession().createBytesMessage();
            if (mqMessage.getType() != null) {
                createBytesMessage.setJMSType(mqMessage.getType().type());
            }
            if (mqMessage.get(MqMessage.PROP_ORIGIN_DESTINATION) == null) {
                mqMessage.set(MqMessage.PROP_ORIGIN_DESTINATION, mqDestination.url());
            }
            if (mqMessage.get(MqMessage.PROP_ORIGIN_DATE) == null) {
                mqMessage.set(MqMessage.PROP_ORIGIN_DATE, new Date());
            }
            createBytesMessage.writeBytes(mqMessage.getByteContent());
            for (String str : mqMessage.propertyNames()) {
                createBytesMessage.setStringProperty(str, mqMessage.get(str));
            }
            return createBytesMessage;
        } catch (Throwable th) {
            throw new MqException("Failed to assemble message bound for " + mqDestination.url() + ". No replay available at this early stage.", th);
        }
    }

    protected MqMessage createMqMessage(Message message) throws MqException {
        try {
            MqMessage mqMessage = this.messageProvider.get();
            if (message.getJMSType() != null && !"".equals(message.getJMSType())) {
                mqMessage.setType(MqContentType.of(message.getJMSType()));
            }
            if (message instanceof TextMessage) {
                try {
                    mqMessage.setContent(((TextMessage) message).getText());
                } catch (Exception e) {
                    if (!(message instanceof ActiveMQTextMessage)) {
                        throw e;
                    }
                    ByteSequence content = ((ActiveMQTextMessage) message).getContent();
                    mqMessage.setContent(new String(content.getData(), content.getOffset() + 4, content.getLength() - 4, StandardCharsets.UTF_8));
                }
            } else if (message instanceof BytesMessage) {
                BytesMessage bytesMessage = (BytesMessage) message;
                byte[] bArr = new byte[(int) bytesMessage.getBodyLength()];
                bytesMessage.readBytes(bArr);
                mqMessage.setByteContent(bArr);
            }
            Enumeration propertyNames = message.getPropertyNames();
            while (propertyNames.hasMoreElements()) {
                String str = (String) propertyNames.nextElement();
                mqMessage.set(str, message.getObjectProperty(str));
            }
            if (message instanceof MapMessage) {
                Enumeration mapNames = ((MapMessage) message).getMapNames();
                while (mapNames.hasMoreElements()) {
                    String str2 = (String) mapNames.nextElement();
                    mqMessage.set(str2, ((MapMessage) message).getObject(str2));
                }
            }
            mqMessage.setLarge(mqMessage.get(MqMessage.PROP_MESSAGE_URL) != null);
            if (mqMessage.get(MqMessage.PROP_ORIGIN_DESTINATION) == null) {
                mqMessage.set(MqMessage.PROP_ORIGIN_DESTINATION, deriveDestinationUrl(message.getJMSDestination()));
            }
            if (mqMessage.get(MqMessage.PROP_ORIGIN_DATE) == null) {
                mqMessage.set(MqMessage.PROP_ORIGIN_DATE, new Date(message.getJMSTimestamp()));
            }
            return mqMessage;
        } catch (JMSException e2) {
            throw new MqException("Failed to convert MqMessage into a JMS TextMessage", e2);
        }
    }

    protected String deriveDestinationUrl(Destination destination) {
        return destination.toString();
    }

    @Override // com.mayam.wf.mq.Mq
    public void send(MqDestination mqDestination, MqMessage mqMessage) throws MqException {
        send(mqDestination, mqMessage, 3);
    }

    public void send(MqDestination mqDestination, MqMessage mqMessage, int i) throws MqException {
        Message createByteMessage = mqMessage.hasByteContent() ? createByteMessage(mqDestination, mqMessage) : createTextMessage(mqDestination, mqMessage);
        String url = mqDestination.url();
        JMSException jMSException = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                producer(url).send(createByteMessage);
                return;
            } catch (JMSException e) {
                jMSException = e;
                logger.warn("Failed to send persistent message to " + url + "- attempt " + (1 + i2) + " of " + i + ".");
                performProducerShutdown();
                nonBlockingSleep(SEND_FAIL_TIMEOUT);
            }
        }
        try {
            this.replayManager.getSendFailStore(new Date()).store(new ReplayStore.EntryBuilder(mqDestination).message(mqMessage).cause(jMSException).build());
            logger.error("Failed to send message {} due to {}. Replay is availble.", mqMessage, jMSException);
        } catch (Throwable th) {
            logger.error("Failed to send message {} due to {}. Additionally, replay was made impossible due to {}.", mqMessage, jMSException, th);
        }
        throw new MqException("Failed to send message to " + url, jMSException);
    }

    protected void sendVolatile(String str, Message message) {
        JMSException jMSException = null;
        for (int i = 0; i < 3; i++) {
            try {
                producer(str).send(message);
                return;
            } catch (JMSException e) {
                jMSException = e;
                logger.warn("Failed to send volatile message to " + str + "- attempt " + (1 + i) + " of 3");
                performProducerShutdown();
                nonBlockingSleep(SEND_FAIL_TIMEOUT);
            }
        }
        logger.warn("Failed to send volatile message {} due to {}", message, jMSException);
    }

    @Override // com.mayam.wf.mq.Mq
    public void sendVolatile(MqDestination mqDestination, MqMessage mqMessage) {
        try {
            sendVolatile(mqDestination.url(), mqMessage.hasByteContent() ? createByteMessage(mqDestination, mqMessage) : createTextMessage(mqDestination, mqMessage));
        } catch (MqException e) {
            logger.warn("Failed to convert volatile message meant for " + String.valueOf(mqDestination));
        }
    }

    @Override // com.mayam.wf.mq.Mq
    public void ping() throws MqException {
        AttributeMap attributeMap = this.mapProvider.get();
        attributeMap.setAttribute(Attribute.ERROR_MSG, (Object) "ping");
        MqMessage build = this.msgBuilderProvider.get().subject(attributeMap).build();
        build.setContent("ping");
        build.set(MessageType.PROPERTY_NAME, MessageType.JOB);
        send(MqDestination.of(TOPIC_MAYAM_PING), build, 1);
    }

    @Override // com.mayam.wf.mq.Mq
    public void listen() {
        listen(Mq.ListenIntensity.NORMAL);
    }

    @Override // com.mayam.wf.mq.Mq
    public void listen(Mq.ListenIntensity listenIntensity) {
        long j;
        Local local = this.local.get();
        switch (listenIntensity) {
            case INTENSE:
                j = 500;
                break;
            case RELAXED:
                j = 5000;
                break;
            default:
                j = 1000;
                break;
        }
        if (local.listeners.isEmpty()) {
            logger.warn("Mq.listen called with no attached listeners");
            nonBlockingSleep(j);
            return;
        }
        long size = j / local.listeners.size();
        if (listenIntensity == Mq.ListenIntensity.INTENSE && local.consumedLastIteration) {
            size = 1;
        }
        boolean z = false;
        boolean z2 = false;
        Iterator<ListenerContainer> it = local.listeners.iterator();
        while (true) {
            if (it.hasNext()) {
                ListenerContainer next = it.next();
                try {
                    Message receive = next.consumer().receive(size);
                    if (receive != null) {
                        next.handle(receive, next.destination);
                        z = true;
                        if (listenIntensity == Mq.ListenIntensity.INTENSE) {
                            size = 1;
                        }
                    }
                } catch (Throwable th) {
                    performConsumerShutdown();
                    if (z2) {
                        logger.warn("Problems while receiving message from " + next.destination + "; (" + th.getMessage() + "). Second issue during the same iteration - bailing in case a shutdown is ordered.", th);
                    } else {
                        long min = Math.min(ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL, (1 + local.consecutiveFailures) * LISTEN_BASE_FAIL_SLEEP);
                        logger.warn("Problems while receiving message from " + next.destination + "; (" + th.getMessage() + "). Waiting " + min + "ms before resuming.", th);
                        nonBlockingSleep(min);
                        z2 = true;
                    }
                }
            }
        }
        if (z2) {
            local.consecutiveFailures++;
        } else {
            local.consecutiveFailures = 0;
        }
        local.consumedLastIteration = z;
    }

    @Override // com.mayam.wf.mq.Mq
    public Mq.Detachable attachListener(MqDestination mqDestination, Mq.Listener listener) {
        ListenerContainer listenerContainer = new ListenerContainer(mqDestination.url(), listener);
        this.local.get().listeners.add(listenerContainer);
        return listenerContainer;
    }

    protected MqMessage retrieveFullMessage(URL url) throws MqException {
        try {
            MqMessage recreateMessage = this.replayManager.recreateMessage(new ReplayStore(this.replayManager.downloadLargeStore(url)).iterator().next());
            recreateMessage.setLarge(true);
            return recreateMessage;
        } catch (MqException e) {
            throw e;
        } catch (Throwable th) {
            throw new MqException("Failed to retrieve full message from url " + String.valueOf(url), th);
        }
    }

    private void nonBlockingSleep(long j) {
        long j2 = j;
        while (j2 > 0) {
            if (j2 <= SLEEP_INCREMENT) {
                Thread.sleep(j2);
                return;
            }
            try {
                j2 -= SLEEP_INCREMENT;
                Thread.sleep(SLEEP_INCREMENT);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            throw new RuntimeException(e);
        }
    }
}
