/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.provider.amqp;

import java.util.ArrayList;
import java.util.ListIterator;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderException;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.WrappedAsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpAbstractResource;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpSubscriptionTracker;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.amqp.message.AmqpCodec;
import org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport;
import org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpConsumer
extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private static final int INDIVIDUAL_ACKNOWLEDGE = 101;
    protected final AmqpSession session;
    protected final int acknowledgementMode;
    protected AsyncResult stopRequest;
    protected AsyncResult pullRequest;
    protected long incomingSequence;
    protected int deliveredCount;
    protected int dispatchedCount;
    protected boolean deferredClose;

    public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver receiver) {
        super(info, receiver, session);
        this.session = session;
        this.acknowledgementMode = info.getAcknowledgementMode();
    }

    @Override
    public void close(AsyncResult request) {
        this.acknowledgeUndeliveredRecoveredMessages();
        if (this.shouldDeferClose()) {
            this.deferredClose = true;
            this.stop(new StopAndReleaseRequest(request));
        } else {
            super.close(request);
        }
    }

    private void acknowledgeUndeliveredRecoveredMessages() {
        if (this.acknowledgementMode == 2 || this.acknowledgementMode == 1 || this.acknowledgementMode == 3 || this.acknowledgementMode == 101) {
            for (Delivery delivery = ((Receiver)this.getEndpoint()).head(); delivery != null; delivery = delivery.next()) {
                JmsInboundMessageDispatch envelope;
                Delivery current = delivery;
                if (!(current.getContext() instanceof JmsInboundMessageDispatch) || !(envelope = (JmsInboundMessageDispatch)current.getContext()).isRecovered() || envelope.isDelivered()) continue;
                this.handleDisposition(envelope, current, (DeliveryState)AmqpSupport.MODIFIED_FAILED);
            }
        }
    }

    public void start(AsyncResult request) {
        JmsConsumerInfo consumerInfo = (JmsConsumerInfo)this.getResourceInfo();
        if (consumerInfo.isListener() && consumerInfo.getPrefetchSize() == 0) {
            this.sendFlowForNoPrefetchListener();
        } else {
            this.sendFlowIfNeeded();
        }
        request.onSuccess();
    }

    public void stop(AsyncResult request) {
        Receiver receiver = (Receiver)this.getEndpoint();
        if (receiver.getRemoteCredit() <= 0) {
            if (receiver.getQueued() == 0) {
                request.onSuccess();
            } else {
                if (this.getDrainTimeout() > 0) {
                    ScheduledFuture<?> future = this.getSession().schedule(() -> {
                        LOG.trace("Consumer {} stop timed out awaiting message processing", (Object)this.getConsumerId());
                        ProviderOperationTimedOutException cause = new ProviderOperationTimedOutException("Consumer stop timed out awaiting message processing");
                        if (this.session.isTransacted() && this.session.getTransactionContext().isInTransaction(this.getConsumerId())) {
                            this.stopRequest.onFailure(cause);
                            this.stopRequest = null;
                        } else {
                            this.closeResource(this.session.getProvider(), cause, false);
                            this.session.getProvider().pumpToProtonTransport();
                        }
                    }, this.getDrainTimeout());
                    this.stopRequest = new ScheduledRequest(future, request);
                } else {
                    this.stopRequest = request;
                }
                LOG.trace("Consumer {} stop awaiting queued delivery processing", (Object)this.getConsumerId());
            }
        } else {
            this.stopRequest = request;
            receiver.drain(0);
            if (this.getDrainTimeout() > 0) {
                ScheduledFuture<?> future = this.getSession().schedule(() -> {
                    LOG.trace("Consumer {} drain request timed out", (Object)this.getConsumerId());
                    ProviderOperationTimedOutException cause = new ProviderOperationTimedOutException("Remote did not respond to a drain request in time");
                    if (this.session.isTransacted() && this.session.getTransactionContext().isInTransaction(this.getConsumerId())) {
                        this.stopRequest.onFailure(cause);
                        this.stopRequest = null;
                    } else {
                        this.closeResource(this.session.getProvider(), cause, false);
                        this.session.getProvider().pumpToProtonTransport();
                    }
                }, this.getDrainTimeout());
                this.stopRequest = new ScheduledRequest(future, this.stopRequest);
            }
        }
    }

    private void stopOnSchedule(long timeout, AsyncResult request) {
        LOG.trace("Consumer {} scheduling stop", (Object)this.getConsumerId());
        ScheduledFuture<?> future = this.getSession().schedule(() -> {
            LOG.trace("Consumer {} running scheduled stop", (Object)this.getConsumerId());
            this.stop(request);
            this.session.getProvider().pumpToProtonTransport(request);
        }, timeout);
        this.stopRequest = new ScheduledRequest(future, request);
    }

    @Override
    public void processFlowUpdates(AmqpProvider provider) throws ProviderException {
        Receiver receiver;
        if (this.stopRequest != null && (receiver = (Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
            this.stopRequest.onSuccess();
            this.stopRequest = null;
        }
        if (this.pullRequest != null && (receiver = (Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
            this.pullRequest.onSuccess();
            this.pullRequest = null;
        }
        LOG.trace("Consumer {} flow updated, remote credit = {}", (Object)this.getConsumerId(), (Object)((Receiver)this.getEndpoint()).getRemoteCredit());
        super.processFlowUpdates(provider);
    }

    public void acknowledge(ProviderConstants.ACK_TYPE ackType) {
        LOG.trace("Session Acknowledge for consumer {} with ack type {}", (Object)((JmsConsumerInfo)this.getResourceInfo()).getId(), (Object)ackType);
        Delivery delivery = ((Receiver)this.getEndpoint()).head();
        while (delivery != null) {
            Accepted disposition;
            Delivery current = delivery;
            delivery = delivery.next();
            if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
                LOG.debug("{} Found incomplete delivery with no context during session acknowledge processing", (Object)this);
                continue;
            }
            JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch)current.getContext();
            if (ackType == ProviderConstants.ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) {
                this.handleDisposition(envelope, current, (DeliveryState)AmqpSupport.MODIFIED_FAILED);
                continue;
            }
            if (!envelope.isDelivered()) continue;
            switch (ackType) {
                case ACCEPTED: {
                    disposition = Accepted.getInstance();
                    break;
                }
                case RELEASED: {
                    disposition = Released.getInstance();
                    break;
                }
                case REJECTED: {
                    disposition = AmqpSupport.REJECTED;
                    break;
                }
                case MODIFIED_FAILED: {
                    disposition = AmqpSupport.MODIFIED_FAILED;
                    break;
                }
                case MODIFIED_FAILED_UNDELIVERABLE: {
                    disposition = AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE;
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Invalid acknowledgement type specified: " + (Object)((Object)ackType));
                }
            }
            this.handleDisposition(envelope, current, (DeliveryState)disposition);
        }
        this.tryCompleteDeferredClose();
    }

    public void acknowledge(JmsInboundMessageDispatch envelope, ProviderConstants.ACK_TYPE ackType) {
        Delivery delivery = null;
        if (!(envelope.getProviderHint() instanceof Delivery)) {
            LOG.warn("Received Ack for unknown message: {}", (Object)envelope);
            return;
        }
        delivery = (Delivery)envelope.getProviderHint();
        switch (ackType) {
            case DELIVERED: {
                this.handleDelivered(envelope, delivery);
                break;
            }
            case ACCEPTED: {
                this.handleAccepted(envelope, delivery);
                break;
            }
            case REJECTED: {
                this.handleDisposition(envelope, delivery, (DeliveryState)AmqpSupport.REJECTED);
                break;
            }
            case RELEASED: {
                this.handleDisposition(envelope, delivery, (DeliveryState)Released.getInstance());
                break;
            }
            case MODIFIED_FAILED: {
                this.handleDisposition(envelope, delivery, (DeliveryState)AmqpSupport.MODIFIED_FAILED);
                break;
            }
            case MODIFIED_FAILED_UNDELIVERABLE: {
                this.handleDisposition(envelope, delivery, (DeliveryState)AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
                break;
            }
            default: {
                LOG.warn("Unsupported Ack Type for message: {}", (Object)envelope);
                throw new IllegalArgumentException("Unknown Acknowledgement type");
            }
        }
        this.sendFlowIfNeeded();
        this.tryCompleteDeferredClose();
    }

    private void handleDelivered(JmsInboundMessageDispatch envelope, Delivery delivery) {
        LOG.debug("Delivered Ack of message: {}", (Object)envelope);
        ++this.deliveredCount;
        envelope.setRecovered(false);
        envelope.setDelivered(true);
        delivery.setDefaultDeliveryState((DeliveryState)AmqpSupport.MODIFIED_FAILED);
    }

    private void handleAccepted(JmsInboundMessageDispatch envelope, Delivery delivery) {
        LOG.debug("Accepted Ack of message: {}", (Object)envelope);
        if (!delivery.remotelySettled()) {
            if (this.session.isTransacted() && !((JmsConsumerInfo)this.getResourceInfo()).isBrowser()) {
                if (this.session.isTransactionInDoubt()) {
                    LOG.trace("Skipping ack of message {} in failed transaction.", (Object)envelope);
                    return;
                }
                Binary txnId = this.session.getTransactionContext().getAmqpTransactionId();
                if (txnId != null) {
                    delivery.disposition((DeliveryState)this.session.getTransactionContext().getTxnAcceptState());
                    delivery.settle();
                    this.session.getTransactionContext().registerTxConsumer(this);
                }
            } else {
                delivery.disposition((DeliveryState)Accepted.getInstance());
                delivery.settle();
            }
        } else {
            delivery.settle();
        }
        if (envelope.isDelivered()) {
            --this.deliveredCount;
        }
        --this.dispatchedCount;
    }

    private void handleDisposition(JmsInboundMessageDispatch envelope, Delivery delivery, DeliveryState outcome) {
        delivery.disposition(outcome);
        delivery.settle();
        if (envelope.isDelivered()) {
            --this.deliveredCount;
        }
        --this.dispatchedCount;
    }

    private void sendFlowIfNeeded() {
        int potentialPrefetch;
        int prefetchSize = ((JmsConsumerInfo)this.getResourceInfo()).getPrefetchSize();
        if (prefetchSize == 0 || this.isStopping()) {
            return;
        }
        int currentCredit = ((Receiver)this.getEndpoint()).getCredit();
        if ((double)currentCredit <= (double)prefetchSize * 0.5 && (double)(potentialPrefetch = currentCredit + (this.dispatchedCount - this.deliveredCount)) <= (double)prefetchSize * 0.7) {
            int additionalCredit = prefetchSize - potentialPrefetch;
            LOG.trace("Consumer {} granting additional credit: {}", (Object)this.getConsumerId(), (Object)additionalCredit);
            ((Receiver)this.getEndpoint()).flow(additionalCredit);
        }
    }

    private void sendFlowForNoPrefetchListener() {
        int currentCredit = ((Receiver)this.getEndpoint()).getCredit();
        if (currentCredit < 1) {
            int additionalCredit = 1 - currentCredit;
            LOG.trace("Consumer {} granting additional credit: {}", (Object)this.getConsumerId(), (Object)additionalCredit);
            ((Receiver)this.getEndpoint()).flow(additionalCredit);
        }
    }

    public void recover() throws Exception {
        LOG.debug("Session Recover for consumer: {}", (Object)((JmsConsumerInfo)this.getResourceInfo()).getId());
        ArrayList<JmsInboundMessageDispatch> redispatchList = new ArrayList<JmsInboundMessageDispatch>();
        for (Delivery delivery = ((Receiver)this.getEndpoint()).head(); delivery != null; delivery = delivery.next()) {
            Delivery current = delivery;
            if (current.getContext() instanceof JmsInboundMessageDispatch) continue;
            LOG.debug("{} Found incomplete delivery with no context during recover processing", (Object)this);
        }
        this.deliveredCount -= redispatchList.size();
        this.dispatchedCount -= redispatchList.size();
        ListIterator reverseIterator = redispatchList.listIterator(redispatchList.size());
        while (reverseIterator.hasPrevious()) {
            this.deliver((JmsInboundMessageDispatch)reverseIterator.previous());
        }
        if (this.deferredClose) {
            this.acknowledgeUndeliveredRecoveredMessages();
            this.tryCompleteDeferredClose();
        }
    }

    public void pull(long timeout, AsyncResult request) {
        LOG.trace("Pull on consumer {} with timeout = {}", (Object)this.getConsumerId(), (Object)timeout);
        if (timeout < 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.pullRequest = request;
        } else if (timeout == 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.stop(request);
        } else if (timeout > 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.stopOnSchedule(timeout, request);
        }
    }

    @Override
    public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws ProviderException {
        if (delivery.getDefaultDeliveryState() == null) {
            delivery.setDefaultDeliveryState((DeliveryState)Released.getInstance());
        }
        if (delivery.isReadable() && !delivery.isPartial()) {
            LOG.trace("{} has incoming Message(s).", (Object)this);
            try {
                if (this.processDelivery(delivery) && this.pullRequest != null) {
                    this.pullRequest.onSuccess();
                    this.pullRequest = null;
                }
            }
            catch (Exception e) {
                throw ProviderExceptionSupport.createNonFatalOrPassthrough(e);
            }
        }
        if (((Receiver)this.getEndpoint()).current() == null && ((Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && this.stopRequest != null) {
            this.stopRequest.onSuccess();
            this.stopRequest = null;
        }
        super.processDeliveryUpdates(provider, delivery);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processDelivery(Delivery incoming) throws Exception {
        JmsMessage message = null;
        try {
            message = AmqpCodec.decodeMessage(this, ((Receiver)this.getEndpoint()).recv()).asJmsMessage();
        }
        catch (Exception e) {
            LOG.warn("Error on transform: {}", (Object)e.getMessage());
            incoming.disposition((DeliveryState)AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
            incoming.settle();
            this.sendFlowIfNeeded();
            return false;
        }
        try {
            message.onDispatch();
            JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(this.getNextIncomingSequenceNumber());
            envelope.setMessage(message);
            envelope.setConsumerId(((JmsConsumerInfo)this.getResourceInfo()).getId());
            envelope.setConsumerInfo((JmsConsumerInfo)this.getResourceInfo());
            envelope.setProviderHint(incoming);
            envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
            incoming.setContext((Object)envelope);
            this.deliver(envelope);
            boolean bl = true;
            return bl;
        }
        finally {
            ((Receiver)this.getEndpoint()).advance();
        }
    }

    protected long getNextIncomingSequenceNumber() {
        return ++this.incomingSequence;
    }

    @Override
    protected void closeOrDetachEndpoint() {
        if (((JmsConsumerInfo)this.getResourceInfo()).isDurable()) {
            ((Receiver)this.getEndpoint()).detach();
        } else {
            ((Receiver)this.getEndpoint()).close();
        }
    }

    public AmqpConnection getConnection() {
        return this.session.getConnection();
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public JmsConsumerId getConsumerId() {
        return ((JmsConsumerInfo)this.getResourceInfo()).getId();
    }

    public JmsDestination getDestination() {
        return ((JmsConsumerInfo)this.getResourceInfo()).getDestination();
    }

    public boolean isStopping() {
        return this.stopRequest != null;
    }

    public int getDrainTimeout() {
        return this.session.getProvider().getDrainTimeout();
    }

    public String toString() {
        return "AmqpConsumer { " + ((JmsConsumerInfo)this.getResourceInfo()).getId() + " }";
    }

    protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
        if (!this.deferredClose) {
            ProviderListener listener = this.session.getProvider().getProviderListener();
            if (listener != null) {
                LOG.debug("Dispatching received message: {}", (Object)envelope);
                ++this.dispatchedCount;
                listener.onInboundMessage(envelope);
            } else {
                LOG.error("Provider listener is not set, message will be dropped: {}", (Object)envelope);
            }
        }
    }

    public void preCommit() {
    }

    public void preRollback() {
    }

    public void postCommit() {
        this.tryCompleteDeferredClose();
    }

    public void postRollback() {
        this.releasePrefetch();
        this.tryCompleteDeferredClose();
    }

    @Override
    public void handleResourceClosure(AmqpProvider provider, ProviderException cause) {
        AmqpConnection connection = this.session.getConnection();
        AmqpSubscriptionTracker subTracker = connection.getSubTracker();
        JmsConsumerInfo consumerInfo = (JmsConsumerInfo)this.getResourceInfo();
        subTracker.consumerRemoved(consumerInfo);
        if (this.stopRequest != null) {
            if (cause == null) {
                this.stopRequest.onSuccess();
            } else {
                this.stopRequest.onFailure(cause);
            }
            this.stopRequest = null;
        }
        if (this.pullRequest != null) {
            if (cause == null) {
                this.pullRequest.onSuccess();
            } else {
                this.pullRequest.onFailure(cause);
            }
            this.pullRequest = null;
        }
    }

    private boolean shouldDeferClose() {
        if (this.getSession().isTransacted() && this.getSession().getTransactionContext().isInTransaction(this.getConsumerId())) {
            return true;
        }
        return this.deliveredCount > 0;
    }

    private void tryCompleteDeferredClose() {
        if (this.deferredClose && this.deliveredCount == 0) {
            super.close(new DeferredCloseRequest());
        }
    }

    private void releasePrefetch() {
        for (Delivery delivery = ((Receiver)this.getEndpoint()).head(); delivery != null; delivery = delivery.next()) {
            Delivery current = delivery;
            if (current.getContext() instanceof JmsInboundMessageDispatch) {
                JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch)current.getContext();
                if (envelope.isDelivered()) continue;
                this.handleDisposition(envelope, current, (DeliveryState)Released.getInstance());
                continue;
            }
            LOG.debug("{} Found incomplete delivery with no context during release processing", (Object)this);
        }
    }

    private static final class ScheduledRequest
    implements AsyncResult {
        private final ScheduledFuture<?> sheduledTask;
        private final AsyncResult origRequest;

        public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
            this.sheduledTask = completionTask;
            this.origRequest = origRequest;
        }

        @Override
        public void onFailure(ProviderException cause) {
            this.sheduledTask.cancel(false);
            this.origRequest.onFailure(cause);
        }

        @Override
        public void onSuccess() {
            boolean cancelled = this.sheduledTask.cancel(false);
            if (cancelled) {
                this.origRequest.onSuccess();
            }
        }

        @Override
        public boolean isComplete() {
            return this.origRequest.isComplete();
        }
    }

    private final class DeferredCloseRequest
    implements AsyncResult {
        private DeferredCloseRequest() {
        }

        @Override
        public void onFailure(ProviderException result) {
            LOG.trace("Failed deferred close of consumer: {} - {}", (Object)AmqpConsumer.this.getConsumerId(), (Object)result.getMessage());
            AmqpConsumer.this.getParent().getProvider().fireNonFatalProviderException(ProviderExceptionSupport.createNonFatalOrPassthrough(result));
        }

        @Override
        public void onSuccess() {
            LOG.trace("Completed deferred close of consumer: {}", (Object)AmqpConsumer.this.getConsumerId());
        }

        @Override
        public boolean isComplete() {
            return AmqpConsumer.this.isClosed();
        }
    }

    private final class StopAndReleaseRequest
    extends WrappedAsyncResult {
        public StopAndReleaseRequest(AsyncResult closeRequest) {
            super(closeRequest);
        }

        @Override
        public void onSuccess() {
            AmqpConsumer.this.releasePrefetch();
            super.onSuccess();
        }
    }
}

