package com.mayam.wf.mq.impl;

import com.google.inject.assistedinject.Assisted;
import com.mayam.wf.attributes.annotations.DecConnectionCount;
import com.mayam.wf.attributes.annotations.IncConnectionCount;
import com.mayam.wf.mq.Mq;
import com.mayam.wf.mq.MqContentType;
import com.mayam.wf.mq.MqDestination;
import com.mayam.wf.mq.MqException;
import com.mayam.wf.mq.MqMessage;
import com.mayam.wf.mq.RetryMessageException;
import com.mayam.wf.mq.replay.ReplayManager;
import com.mayam.wf.mq.replay.ReplayStore;
import com.mayam.wf.siteconfig.MqConfig;
import com.mayam.wf.siteconfig.ProcessName;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnection;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Provider;
import net.bytebuddy.implementation.MethodDelegation;
import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mayam/wf/mq/impl/RabbitMq.class */
public class RabbitMq implements Mq {
    private static final String MAYAM_TOPIC_EXCHANGE = "mayam.topics";
    private static final int MAX_RETRIES = 3;
    private static final long INTENSE_TIMEOUT = 500;
    private static final long INTENSE_TIMEOUT_TRIGGERED = 1;
    private static final long NORMAL_TIMEOUT = 1000;
    private static final long RELAXED_TIMEOUT = 5000;
    private static final long LISTEN_FAIL_TIMEOUT = 100;
    private static final long SLEEP_INCREMENT = 200;
    private final ReplayManager replayManager;
    private final Provider<MqMessage> messageProvider;
    private final String processName;
    private final MqConfig mqConfig;
    private final ThreadLocal<Local> local = new ThreadLocal<Local>() { // from class: com.mayam.wf.mq.impl.RabbitMq.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public Local initialValue() {
            return new Local();
        }
    };
    static final Logger logger = LoggerFactory.getLogger((Class<?>) RabbitMq.class);
    private static long SEND_FAIL_TIMEOUT = 5000;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/mayam/wf/mq/impl/RabbitMq$ListenerContainer.class */
    public class ListenerContainer implements Mq.Detachable {
        private String destination = null;
        private MqDestination mqDestination;
        protected Mq.Listener callback;

        public ListenerContainer(MqDestination mqDestination, Mq.Listener listener) {
            this.mqDestination = mqDestination;
            this.callback = listener;
        }

        private String getDestination(Channel channel) throws MqException {
            if (this.destination == null) {
                this.destination = RabbitMq.this.createDestination(channel, this.mqDestination, true);
            }
            return this.destination;
        }

        private void removeDestination() {
            this.destination = null;
        }

        @Override // com.mayam.wf.mq.Mq.Detachable
        public void detach() {
            RabbitMq.this.shutdownConsumers();
            RabbitMq.this.local.get().listeners.remove(this);
        }

        private void ack(GetResponse getResponse) {
            try {
                RabbitMq.this.consumerChannel().basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
            } catch (MqException | IOException e) {
                RabbitMq.logger.warn("Failed to acknowledge message to " + this.destination);
            }
        }

        protected void handle(GetResponse getResponse) {
            if (getResponse == null) {
                return;
            }
            MqMessage createMqMessage = RabbitMq.this.createMqMessage(getResponse);
            try {
                String str = createMqMessage.get(MqMessage.PROP_MESSAGE_URL);
                if (str != null) {
                    createMqMessage = RabbitMq.this.retrieveFullMessage(new URL(str));
                }
                this.callback.onMessage(createMqMessage);
                ack(getResponse);
            } catch (Throwable th) {
                try {
                    RabbitMq.this.replayManager.getHandleFailStore(new Date()).store(new ReplayStore.EntryBuilder(this.destination).message(createMqMessage).cause(th).build());
                    if (th instanceof RetryMessageException) {
                        RabbitMq.logger.error("Failed to process message {} due to {}/{}. Replay is availble (message not acknowledged)", createMqMessage, th, th.getMessage());
                        RabbitMq.logger.error("StackTrace:", th);
                    } else {
                        RabbitMq.logger.error("Failed to process message {} due to {}. Replay is availble (message acknowledged)", createMqMessage, th);
                        RabbitMq.logger.error("StackTrace:", th);
                        ack(getResponse);
                    }
                } catch (Throwable th2) {
                    RabbitMq.logger.error("Failed to process message {} due to {}. Additionally, replay was made impossible due to {} (mesage not acknowledged)", createMqMessage, th, th2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mayam/wf/mq/impl/RabbitMq$Local.class */
    public static class Local {
        Connection consumerConnection;
        Connection producerConnection;
        Channel producerChannel;
        Channel consumerChannel;
        List<ListenerContainer> listeners = new ArrayList();
        boolean consumedLastIteration = false;

        private Local() {
        }
    }

    @Inject
    public RabbitMq(ReplayManager replayManager, Provider<MqMessage> provider, @ProcessName String str, @Assisted MqConfig mqConfig) {
        this.replayManager = replayManager;
        this.messageProvider = provider;
        this.processName = str;
        this.mqConfig = mqConfig;
    }

    private Connection createConnection(String str, String str2, String str3) throws MqException {
        try {
            String str4 = System.getenv().get("TASKS_DISABLE_RABBITMQ_AUTORECOVERY");
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(str3);
            connectionFactory.setUsername(str);
            connectionFactory.setPassword(str2);
            if (str4 != null) {
                logger.info("Disabling rabbitmq autorecovery, since TASKS_DISABLE_RABBITMQ_AUTORECOVERY is specified");
                connectionFactory.setAutomaticRecoveryEnabled(false);
            }
            Connection newConnection = connectionFactory.newConnection();
            if (str4 == null) {
                try {
                    Field declaredField = AutorecoveringConnection.class.getDeclaredField(MethodDelegation.ImplementationDelegate.FIELD_NAME_PREFIX);
                    declaredField.setAccessible(true);
                    RecoveryAwareAMQConnection recoveryAwareAMQConnection = (RecoveryAwareAMQConnection) declaredField.get(newConnection);
                    Field declaredField2 = AMQConnection.class.getDeclaredField("mainLoopThread");
                    declaredField2.setAccessible(true);
                    Thread thread = (Thread) declaredField2.get(recoveryAwareAMQConnection);
                    thread.setName(thread.getName() + " [" + this.processName + "/" + Thread.currentThread().getName() + "]");
                } catch (Exception e) {
                }
            }
            return newConnection;
        } catch (Exception e2) {
            logger.error("Unable to create connection", (Throwable) e2);
            throw new MqException("Unable to create connection", e2);
        }
    }

    @Override // com.mayam.wf.mq.Mq
    public void send(MqDestination mqDestination, MqMessage mqMessage) throws MqException {
        send(mqDestination, mqMessage, 3, false);
    }

    private void send(MqDestination mqDestination, MqMessage mqMessage, int i, boolean z) throws MqException {
        Exception exc = null;
        String url = mqDestination.url();
        for (int i2 = 0; i2 < i; i2++) {
            try {
                Channel producerChannel = producerChannel();
                String createDestination = createDestination(producerChannel, mqDestination);
                if (url.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
                    producerChannel.basicPublish(MAYAM_TOPIC_EXCHANGE, createDestination, createMessageProperties(mqMessage, mqDestination), stringToBytes(mqMessage.getContent()));
                    return;
                }
                if (!url.startsWith("produce://")) {
                    producerChannel.basicPublish("", createDestination, createMessageProperties(mqMessage, mqDestination), stringToBytes(mqMessage.getContent()));
                    return;
                }
                AMQP.BasicProperties createMessageProperties = createMessageProperties(mqMessage, mqDestination);
                MqDestination.ProducerArguments producerArguments = mqDestination.getProducerArguments();
                producerChannel.exchangeDeclare(producerArguments.getExchangeName(), exchangeTypeFromString(producerArguments.getExchangeType()), true);
                producerChannel.basicPublish(producerArguments.getExchangeName(), producerArguments.getRoutingKey(), createMessageProperties, stringToBytes(mqMessage.getContent()));
                return;
            } catch (Exception e) {
                exc = e;
                logger.warn("Failed to send persistent message to " + url + "- attempt " + (1 + i2) + " of " + i + ".");
                try {
                    shutdownProducers();
                } catch (Exception e2) {
                }
                nonBlockingSleep(SEND_FAIL_TIMEOUT);
            }
        }
        if (z) {
            return;
        }
        try {
            this.replayManager.getSendFailStore(new Date()).store(new ReplayStore.EntryBuilder(mqDestination).message(mqMessage).cause(exc).build());
            logger.error("Failed to send message {} due to {}. Replay is availble.", mqMessage, exc);
        } catch (Throwable th) {
            logger.error("Failed to send message {} due to {}. Additionally, replay was made impossible due to {}.", mqMessage, exc, th);
        }
        throw new MqException("Failed to send message to " + url, exc);
    }

    private byte[] stringToBytes(String str) {
        if (str == null) {
            return null;
        }
        return str.getBytes();
    }

    private AMQP.BasicProperties createMessageProperties(MqMessage mqMessage, MqDestination mqDestination) throws IOException {
        AMQP.BasicProperties basicProperties = MessageProperties.PERSISTENT_BASIC;
        Map<String, Object> headers = basicProperties.getHeaders();
        if (headers == null) {
            headers = new HashMap();
        }
        if (mqMessage.getType() != null) {
            mqMessage.set(MqMessage.PROP_MQ_MESSAGE_TYPE, mqMessage.getType().type());
        }
        if (mqMessage.get(MqMessage.PROP_ORIGIN_DESTINATION) == null) {
            mqMessage.set(MqMessage.PROP_ORIGIN_DESTINATION, mqDestination.url());
        }
        if (mqMessage.get(MqMessage.PROP_ORIGIN_DATE) == null) {
            mqMessage.set(MqMessage.PROP_ORIGIN_DATE, new Date());
        }
        for (String str : mqMessage.propertyNames()) {
            headers.put(str, mqMessage.get(str));
        }
        return basicProperties.builder().headers(headers).build();
    }

    private MqMessage createMqMessage(GetResponse getResponse) {
        MqMessage mqMessage = this.messageProvider.get();
        AMQP.BasicProperties props = getResponse.getProps();
        Map<String, Object> headers = props == null ? null : props.getHeaders();
        if (headers != null) {
            for (Map.Entry<String, Object> entry : headers.entrySet()) {
                mqMessage.set(entry.getKey(), entry.getValue().toString());
            }
            Object obj = headers.get(MqMessage.PROP_MQ_MESSAGE_TYPE);
            if (obj != null) {
                mqMessage.setType(MqContentType.of(obj.toString()));
            }
        }
        mqMessage.set(MqMessage.PROP_MESSAGE_ROUTING_KEY, getResponse.getEnvelope().getRoutingKey());
        mqMessage.set(MqMessage.PROP_MESSAGE_EXCHANGE_NAME, getResponse.getEnvelope().getExchange());
        mqMessage.setContent(new String(getResponse.getBody()));
        mqMessage.setLarge(mqMessage.get(MqMessage.PROP_MESSAGE_URL) != null);
        return mqMessage;
    }

    @Override // com.mayam.wf.mq.Mq
    public void sendVolatile(MqDestination mqDestination, MqMessage mqMessage) {
        try {
            send(mqDestination, mqMessage, 3, true);
        } catch (MqException e) {
        }
    }

    @Override // com.mayam.wf.mq.Mq
    public void listen() {
        listen(Mq.ListenIntensity.NORMAL);
    }

    @Override // com.mayam.wf.mq.Mq
    public void listen(Mq.ListenIntensity listenIntensity) {
        long j;
        Local local = this.local.get();
        switch (listenIntensity) {
            case INTENSE:
                j = 500;
                break;
            case RELAXED:
                j = 5000;
                break;
            default:
                j = 1000;
                break;
        }
        if (local.listeners.isEmpty()) {
            logger.warn("Mq.listen called with no attached listeners");
            nonBlockingSleep(j);
            return;
        }
        long size = j / local.listeners.size();
        if (listenIntensity == Mq.ListenIntensity.INTENSE && local.consumedLastIteration) {
            size = 1;
        }
        boolean z = false;
        boolean z2 = false;
        Iterator<ListenerContainer> it = local.listeners.iterator();
        while (true) {
            if (it.hasNext()) {
                ListenerContainer next = it.next();
                try {
                    Channel consumerChannel = consumerChannel();
                    GetResponse basicGet = consumerChannel.basicGet(next.getDestination(consumerChannel), false);
                    if (basicGet == null) {
                        nonBlockingSleep(size);
                    } else {
                        next.handle(basicGet);
                        z = true;
                        if (listenIntensity == Mq.ListenIntensity.INTENSE) {
                            size = 1;
                        }
                    }
                } catch (Throwable th) {
                    next.removeDestination();
                    if (z2) {
                        logger.warn("Problems while receiving message from " + String.valueOf(next.mqDestination) + "; (" + th.getMessage() + "). Second issue during the same iteration - bailing in case a shutdown is ordered.", th);
                    } else {
                        logger.warn("Problems while receiving message from " + String.valueOf(next.mqDestination) + "; (" + th.getMessage() + "). Waiting 100ms before resuming.", th);
                        nonBlockingSleep(LISTEN_FAIL_TIMEOUT);
                        z2 = true;
                    }
                }
            }
        }
        local.consumedLastIteration = z;
    }

    @Override // com.mayam.wf.mq.Mq
    public Mq.Detachable attachListener(MqDestination mqDestination, Mq.Listener listener) {
        ListenerContainer listenerContainer = new ListenerContainer(mqDestination, listener);
        this.local.get().listeners.add(listenerContainer);
        return listenerContainer;
    }

    @Override // com.mayam.wf.mq.Mq
    public void shutdownProducers() {
        Local local = this.local.get();
        boolean z = false;
        if (local.producerChannel != null) {
            try {
                logger.info("MQ: about to close producerChannel {}", local.producerChannel);
                local.producerChannel.close();
                z = true;
            } catch (Throwable th) {
                logger.error("MQ: Unable to close producerSession", th);
            }
        }
        if (local.producerConnection != null) {
            try {
                logger.info("MQ: about to close producerConnection {}", local.producerConnection);
                closeProducerConnection(local.producerConnection);
                z = true;
            } catch (Throwable th2) {
                logger.error("MQ: Unable to close producerConnection", th2);
            }
        }
        if (z) {
            logger.info("MQ: Did shut down producers");
        }
        local.producerChannel = null;
        local.producerConnection = null;
    }

    @Override // com.mayam.wf.mq.Mq
    public void shutdownConsumers() {
        Local local = this.local.get();
        boolean z = false;
        if (local.consumerChannel != null) {
            try {
                logger.info("MQ: about to close consumerChannel {}", local.producerChannel);
                local.consumerChannel.close();
                z = true;
            } catch (Throwable th) {
                logger.error("MQ: Unable to close consumerSession", th);
            }
        }
        if (local.consumerConnection != null) {
            try {
                logger.info("MQ: about to close consumerConnection {}", local.consumerConnection);
                closeConsumerConnection(local.consumerConnection);
                z = true;
            } catch (Throwable th2) {
                logger.error("MQ: Unable to close consumerConnection", th2);
            }
        }
        if (z) {
            logger.info("MQ: Did shut down consumers");
        }
        local.consumerChannel = null;
        local.consumerConnection = null;
    }

    @Override // com.mayam.wf.mq.Mq
    public void ping() throws MqException {
    }

    private String createDestination(Channel channel, MqDestination mqDestination) throws MqException {
        return createDestination(channel, mqDestination, false);
    }

    private String createDestination(Channel channel, MqDestination mqDestination, boolean z) throws MqException {
        String queue;
        String url = mqDestination.url();
        try {
            if (url.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
                if (!z) {
                    channel.exchangeDeclare(MAYAM_TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
                    return url.substring(8);
                }
                channel.exchangeDeclare(MAYAM_TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
                String queue2 = channel.queueDeclare().getQueue();
                channel.queueBind(queue2, MAYAM_TOPIC_EXCHANGE, url.substring(8));
                return queue2;
            }
            if (url.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
                String substring = url.substring(8);
                channel.queueDeclare(substring, true, false, false, null);
                return substring;
            }
            if (!url.startsWith("consume://")) {
                if (url.startsWith("produce://")) {
                    if (z) {
                        throw new MqException("produce:// url used for consumer", null);
                    }
                    MqDestination.ProducerArguments producerArguments = mqDestination.getProducerArguments();
                    channel.exchangeDeclare(producerArguments.getExchangeName(), exchangeTypeFromString(producerArguments.getExchangeType()), true);
                }
                return null;
            }
            if (!z) {
                throw new MqException("consume:// url used for producer", null);
            }
            MqDestination.ConsumerArguments consumerArguments = mqDestination.getConsumerArguments();
            channel.exchangeDeclare(consumerArguments.getExchangeName(), exchangeTypeFromString(consumerArguments.getExchangeType()), true);
            if (consumerArguments.getQueue() != null) {
                channel.queueDeclare(consumerArguments.getQueue(), true, false, false, null);
                queue = consumerArguments.getQueue();
            } else {
                queue = channel.queueDeclare().getQueue();
            }
            Iterator<String> it = consumerArguments.getBindingKeys().iterator();
            while (it.hasNext()) {
                channel.queueBind(queue, consumerArguments.getExchangeName(), it.next());
            }
            return queue;
        } catch (IOException e) {
            String str = "Unable to create queue / exchange " + url;
            logger.error(str, (Throwable) e);
            throw new MqException(str, e);
        }
    }

    private BuiltinExchangeType exchangeTypeFromString(String str) {
        BuiltinExchangeType builtinExchangeType = BuiltinExchangeType.DIRECT;
        if ("topic".equals(str)) {
            builtinExchangeType = BuiltinExchangeType.TOPIC;
        } else if ("fanout".equals(str)) {
            builtinExchangeType = BuiltinExchangeType.FANOUT;
        }
        return builtinExchangeType;
    }

    @IncConnectionCount("ConsumerMQ")
    protected Connection consumerConnection() throws MqException {
        Local local = this.local.get();
        if (local.consumerConnection == null || !local.consumerConnection.isOpen()) {
            logger.debug("New consumer connection");
            local.consumerConnection = createConnection(this.mqConfig.getMqUserName(), this.mqConfig.getMqPassword(), this.mqConfig.getMqBrokerUrl());
            logger.info("MQ: created consumerConnection {}", local.consumerConnection);
        }
        return local.consumerConnection;
    }

    @IncConnectionCount("ProducerMQ")
    protected Connection producerConnection() throws MqException {
        Local local = this.local.get();
        if (local.producerConnection == null) {
            logger.debug("New producer connection");
            local.producerConnection = createConnection(this.mqConfig.getMqUserName(), this.mqConfig.getMqPassword(), this.mqConfig.getMqBrokerUrl());
            logger.info("MQ: created producerConnection {}", local.producerConnection);
        }
        return local.producerConnection;
    }

    protected Channel producerChannel() throws MqException {
        Local local = this.local.get();
        if (local.producerChannel == null) {
            logger.debug("New producer channel");
            try {
                local.producerChannel = producerConnection().createChannel();
                logger.info("MQ: created producerChannel {}", local.producerChannel);
            } catch (Exception e) {
                logger.error("Unable to create channel for producer", (Throwable) e);
                throw new MqException("Unable to create channel for producer", e);
            }
        }
        return local.producerChannel;
    }

    protected Channel consumerChannel() throws MqException {
        Local local = this.local.get();
        if (local.consumerChannel == null || !local.consumerConnection.isOpen()) {
            logger.debug("New consumer channel");
            try {
                local.consumerChannel = consumerConnection().createChannel();
                logger.info("MQ: created consumerChannel {}", local.consumerChannel);
            } catch (IOException e) {
                logger.error("Unable to create channel for consumer", (Throwable) e);
                throw new MqException("Unable to create channel for consumer", e);
            }
        }
        return local.consumerChannel;
    }

    @DecConnectionCount("ProducerMQ")
    private void closeProducerConnection(Connection connection) throws IOException {
        connection.close();
    }

    @DecConnectionCount("ConsumerMQ")
    private void closeConsumerConnection(Connection connection) throws IOException {
        connection.close();
    }

    protected MqMessage retrieveFullMessage(URL url) throws MqException {
        try {
            MqMessage recreateMessage = this.replayManager.recreateMessage(new ReplayStore(this.replayManager.downloadLargeStore(url)).iterator().next());
            recreateMessage.setLarge(true);
            return recreateMessage;
        } catch (MqException e) {
            throw e;
        } catch (Throwable th) {
            throw new MqException("Failed to retrieve full message from url " + String.valueOf(url), th);
        }
    }

    private void nonBlockingSleep(long j) {
        long j2 = j;
        while (j2 > 0) {
            if (j2 <= SLEEP_INCREMENT) {
                Thread.sleep(j2);
                return;
            }
            try {
                j2 -= SLEEP_INCREMENT;
                Thread.sleep(SLEEP_INCREMENT);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            throw new RuntimeException(e);
        }
    }
}
