/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.gateway;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.history.HistoryWritingMessagePostProcessor;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.mapping.MessageMappingException;
import org.springframework.integration.mapping.OutboundMessageMapper;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.support.converter.SimpleMessageConverter;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.support.management.IntegrationManagement;
import org.springframework.integration.support.management.MessageSourceMetrics;
import org.springframework.integration.support.management.TrackableComponent;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

@IntegrationManagedResource
public abstract class MessagingGatewaySupport
extends AbstractEndpoint
implements TrackableComponent,
MessageSourceMetrics {
    private static final long DEFAULT_TIMEOUT = 1000L;
    private final SimpleMessageConverter messageConverter = new SimpleMessageConverter();
    protected final MessagingTemplate messagingTemplate;
    private final HistoryWritingMessagePostProcessor historyWritingPostProcessor = new HistoryWritingMessagePostProcessor();
    private final Object replyMessageCorrelatorMonitor = new Object();
    private final boolean errorOnTimeout;
    private final AtomicLong messageCount = new AtomicLong();
    private final IntegrationManagement.ManagementOverrides managementOverrides = new IntegrationManagement.ManagementOverrides();
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private volatile MessageChannel requestChannel;
    private volatile String requestChannelName;
    private volatile MessageChannel replyChannel;
    private volatile String replyChannelName;
    private volatile MessageChannel errorChannel;
    private volatile String errorChannelName;
    private volatile long replyTimeout = 1000L;
    private volatile InboundMessageMapper requestMapper = new DefaultRequestMapper();
    private volatile boolean initialized;
    private volatile AbstractEndpoint replyMessageCorrelator;
    private volatile String managedType;
    private volatile String managedName;
    private volatile boolean countsEnabled;
    private volatile boolean loggingEnabled = true;

    public MessagingGatewaySupport() {
        this(false);
    }

    public MessagingGatewaySupport(boolean errorOnTimeout) {
        MessagingTemplate template = new MessagingTemplate();
        template.setMessageConverter(this.messageConverter);
        template.setSendTimeout(1000L);
        template.setReceiveTimeout(this.replyTimeout);
        this.messagingTemplate = template;
        this.errorOnTimeout = errorOnTimeout;
    }

    public void setRequestChannel(MessageChannel requestChannel) {
        this.requestChannel = requestChannel;
    }

    public void setRequestChannelName(String requestChannelName) {
        Assert.hasText((String)requestChannelName, (String)"'requestChannelName' must not be empty");
        this.requestChannelName = requestChannelName;
    }

    public void setReplyChannel(MessageChannel replyChannel) {
        this.replyChannel = replyChannel;
    }

    public void setReplyChannelName(String replyChannelName) {
        Assert.hasText((String)replyChannelName, (String)"'replyChannelName' must not be empty");
        this.replyChannelName = replyChannelName;
    }

    public void setErrorChannel(MessageChannel errorChannel) {
        this.errorChannel = errorChannel;
    }

    public void setErrorChannelName(String errorChannelName) {
        Assert.hasText((String)errorChannelName, (String)"'errorChannelName' must not be empty");
        this.errorChannelName = errorChannelName;
    }

    public void setRequestTimeout(long requestTimeout) {
        this.messagingTemplate.setSendTimeout(requestTimeout);
    }

    public void setReplyTimeout(long replyTimeout) {
        this.replyTimeout = replyTimeout;
        this.messagingTemplate.setReceiveTimeout(replyTimeout);
    }

    public void setRequestMapper(InboundMessageMapper<?> requestMapper) {
        requestMapper = requestMapper != null ? requestMapper : new DefaultRequestMapper();
        this.requestMapper = requestMapper;
        this.messageConverter.setInboundMessageMapper(requestMapper);
    }

    public void setReplyMapper(OutboundMessageMapper<?> replyMapper) {
        this.messageConverter.setOutboundMessageMapper(replyMapper);
    }

    @Override
    public void setShouldTrack(boolean shouldTrack) {
        this.historyWritingPostProcessor.setShouldTrack(shouldTrack);
    }

    @Override
    public int getMessageCount() {
        return (int)this.messageCount.get();
    }

    @Override
    public long getMessageCountLong() {
        return this.messageCount.get();
    }

    @Override
    public void setManagedName(String name) {
        this.managedName = name;
    }

    @Override
    public String getManagedName() {
        return this.managedName;
    }

    @Override
    public void setManagedType(String type) {
        this.managedType = type;
    }

    @Override
    public String getManagedType() {
        return this.managedType;
    }

    @Override
    public String getComponentType() {
        return "gateway";
    }

    @Override
    public void setLoggingEnabled(boolean enabled) {
        this.loggingEnabled = enabled;
        this.managementOverrides.loggingConfigured = true;
    }

    @Override
    public boolean isLoggingEnabled() {
        return this.loggingEnabled;
    }

    @Override
    public void setCountsEnabled(boolean countsEnabled) {
        this.countsEnabled = countsEnabled;
        this.managementOverrides.countsConfigured = true;
    }

    @Override
    public boolean isCountsEnabled() {
        return this.countsEnabled;
    }

    public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull((Object)errorMessageStrategy, (String)"'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    @Override
    public IntegrationManagement.ManagementOverrides getOverrides() {
        return this.managementOverrides;
    }

    @Override
    protected void onInit() throws Exception {
        Assert.state((this.requestChannelName == null || this.requestChannel == null ? 1 : 0) != 0, (String)"'requestChannelName' and 'requestChannel' are mutually exclusive.");
        Assert.state((this.replyChannelName == null || this.replyChannel == null ? 1 : 0) != 0, (String)"'replyChannelName' and 'replyChannel' are mutually exclusive.");
        Assert.state((this.errorChannelName == null || this.errorChannel == null ? 1 : 0) != 0, (String)"'errorChannelName' and 'errorChannel' are mutually exclusive.");
        this.historyWritingPostProcessor.setTrackableComponent(this);
        this.historyWritingPostProcessor.setMessageBuilderFactory(this.getMessageBuilderFactory());
        if (this.getBeanFactory() != null) {
            this.messagingTemplate.setBeanFactory(this.getBeanFactory());
            if (this.requestMapper instanceof DefaultRequestMapper) {
                ((DefaultRequestMapper)this.requestMapper).setMessageBuilderFactory(this.getMessageBuilderFactory());
            }
            this.messageConverter.setBeanFactory(this.getBeanFactory());
        }
        this.initialized = true;
    }

    private void initializeIfNecessary() {
        if (!this.initialized) {
            this.afterPropertiesSet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MessageChannel getRequestChannel() {
        if (this.requestChannelName != null) {
            MessagingGatewaySupport messagingGatewaySupport = this;
            synchronized (messagingGatewaySupport) {
                if (this.requestChannelName != null) {
                    this.requestChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.requestChannelName);
                    this.requestChannelName = null;
                }
            }
        }
        return this.requestChannel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected MessageChannel getReplyChannel() {
        if (this.replyChannelName != null) {
            MessagingGatewaySupport messagingGatewaySupport = this;
            synchronized (messagingGatewaySupport) {
                if (this.replyChannelName != null) {
                    this.replyChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.replyChannelName);
                    this.replyChannelName = null;
                }
            }
        }
        return this.replyChannel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MessageChannel getErrorChannel() {
        if (this.errorChannelName != null) {
            MessagingGatewaySupport messagingGatewaySupport = this;
            synchronized (messagingGatewaySupport) {
                if (this.errorChannelName != null) {
                    this.errorChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.errorChannelName);
                    this.errorChannelName = null;
                }
            }
        }
        return this.errorChannel;
    }

    protected void send(Object object) {
        this.initializeIfNecessary();
        Assert.notNull((Object)object, (String)"request must not be null");
        MessageChannel requestChannel = this.getRequestChannel();
        Assert.state((requestChannel != null ? 1 : 0) != 0, (String)"send is not supported, because no request channel has been configured");
        try {
            if (this.countsEnabled) {
                this.messageCount.incrementAndGet();
            }
            this.messagingTemplate.convertAndSend(requestChannel, object, this.historyWritingPostProcessor);
        }
        catch (Exception e) {
            MessageChannel errorChannel = this.getErrorChannel();
            if (errorChannel != null) {
                this.messagingTemplate.send(errorChannel, (Message)new ErrorMessage((Throwable)e));
            }
            this.rethrow(e, "failed to send message");
        }
    }

    protected Object receive() {
        this.initializeIfNecessary();
        MessageChannel replyChannel = this.getReplyChannel();
        Assert.state((replyChannel != null && replyChannel instanceof PollableChannel ? 1 : 0) != 0, (String)"receive is not supported, because no pollable reply channel has been configured");
        return this.messagingTemplate.receiveAndConvert(replyChannel, Object.class);
    }

    protected Message<?> receiveMessage() {
        this.initializeIfNecessary();
        MessageChannel replyChannel = this.getReplyChannel();
        Assert.state((boolean)(replyChannel instanceof PollableChannel), (String)"receive is not supported, because no pollable reply channel has been configured");
        return this.messagingTemplate.receive(replyChannel);
    }

    protected Object receive(long timeout) {
        this.initializeIfNecessary();
        MessageChannel replyChannel = this.getReplyChannel();
        Assert.state((replyChannel != null && replyChannel instanceof PollableChannel ? 1 : 0) != 0, (String)"receive is not supported, because no pollable reply channel has been configured");
        return this.messagingTemplate.receiveAndConvert(replyChannel, timeout);
    }

    protected Message<?> receiveMessage(long timeout) {
        this.initializeIfNecessary();
        MessageChannel replyChannel = this.getReplyChannel();
        Assert.state((boolean)(replyChannel instanceof PollableChannel), (String)"receive is not supported, because no pollable reply channel has been configured");
        return this.messagingTemplate.receive(replyChannel, timeout);
    }

    protected Object sendAndReceive(Object object) {
        return this.doSendAndReceive(object, true);
    }

    protected Message<?> sendAndReceiveMessage(Object object) {
        return (Message)this.doSendAndReceive(object, false);
    }

    private Object doSendAndReceive(Object object, boolean shouldConvert) {
        this.initializeIfNecessary();
        Assert.notNull((Object)object, (String)"request must not be null");
        MessageChannel requestChannel = this.getRequestChannel();
        if (requestChannel == null) {
            throw new MessagingException("No request channel available. Cannot send request message.");
        }
        this.registerReplyMessageCorrelatorIfNecessary();
        Message<?> reply = null;
        Object error = null;
        Message<?> requestMessage = null;
        try {
            if (this.countsEnabled) {
                this.messageCount.incrementAndGet();
            }
            if (shouldConvert) {
                reply = this.messagingTemplate.convertSendAndReceive(requestChannel, object, Object.class, this.historyWritingPostProcessor);
                if (reply instanceof Throwable) {
                    error = (Throwable)reply;
                }
            } else {
                requestMessage = object instanceof Message ? (Message<?>)object : this.requestMapper.toMessage(object);
                reply = this.messagingTemplate.sendAndReceive(requestChannel, requestMessage = this.historyWritingPostProcessor.postProcessMessage(requestMessage));
                if (reply instanceof ErrorMessage) {
                    error = (Throwable)((ErrorMessage)reply).getPayload();
                }
            }
            if (reply == null && this.errorOnTimeout) {
                error = object instanceof Message ? new MessageTimeoutException((Message)object, "No reply received within timeout") : new MessageTimeoutException("No reply received within timeout");
            }
        }
        catch (Exception e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("failure occurred in gateway sendAndReceive: " + e.getMessage()));
            }
            error = e;
        }
        if (error != null) {
            MessageChannel errorChannel = this.getErrorChannel();
            if (errorChannel != null) {
                ErrorMessage errorMessage = this.buildErrorMessage(requestMessage, (Throwable)error);
                Message<?> errorFlowReply = null;
                try {
                    errorFlowReply = this.messagingTemplate.sendAndReceive(errorChannel, (Message<?>)errorMessage);
                }
                catch (Exception errorFlowFailure) {
                    throw new MessagingException((Message)errorMessage, "failure occurred in error-handling flow", (Throwable)errorFlowFailure);
                }
                if (shouldConvert) {
                    Object result;
                    Object object2 = result = errorFlowReply != null ? errorFlowReply.getPayload() : null;
                    if (result instanceof Throwable) {
                        this.rethrow((Throwable)result, "error flow returned Exception");
                    }
                    return result;
                }
                if (errorFlowReply != null && errorFlowReply.getPayload() instanceof Throwable) {
                    this.rethrow((Throwable)errorFlowReply.getPayload(), "error flow returned an Error Message");
                }
                if (errorFlowReply == null && this.errorOnTimeout) {
                    if (object instanceof Message) {
                        throw new MessageTimeoutException((Message)object, "No reply received from error channel within timeout");
                    }
                    throw new MessageTimeoutException("No reply received from error channel within timeout");
                }
                return errorFlowReply;
            }
            this.rethrow((Throwable)error, "gateway received checked Exception");
        }
        return reply;
    }

    protected Mono<Message<?>> sendAndReceiveMessageReactive(Object object) {
        this.initializeIfNecessary();
        Assert.notNull((Object)object, (String)"request must not be null");
        MessageChannel requestChannel = this.getRequestChannel();
        if (requestChannel == null) {
            throw new MessagingException("No request channel available. Cannot send request message.");
        }
        this.registerReplyMessageCorrelatorIfNecessary();
        return this.doSendAndReceiveMessageReactive(requestChannel, object, false);
    }

    private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestChannel, Object object, boolean error) {
        return Mono.defer(() -> {
            Message<?> message;
            try {
                message = object instanceof Message ? (Message<?>)object : this.requestMapper.toMessage(object);
                message = this.historyWritingPostProcessor.postProcessMessage(message);
            }
            catch (Exception e) {
                throw new MessageMappingException("Cannot map to message: " + object, e);
            }
            Object originalReplyChannelHeader = message.getHeaders().getReplyChannel();
            Object originalErrorChannelHeader = message.getHeaders().getErrorChannel();
            FutureReplyChannel replyChannel = new FutureReplyChannel();
            Message requestMessage = MutableMessageBuilder.fromMessage(message).setReplyChannel(replyChannel).setHeader(this.messagingTemplate.getSendTimeoutHeader(), null).setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null).setErrorChannel(replyChannel).build();
            if (requestChannel instanceof ReactiveStreamsSubscribableChannel) {
                ((ReactiveStreamsSubscribableChannel)requestChannel).subscribeTo((Publisher<Message<?>>)Mono.just(requestMessage));
            } else {
                boolean sent;
                long sendTimeout = this.sendTimeout(requestMessage);
                boolean bl = sent = sendTimeout >= 0L ? requestChannel.send(requestMessage, sendTimeout) : requestChannel.send(requestMessage);
                if (!sent) {
                    throw new MessageDeliveryException(requestMessage, "Failed to send message to channel '" + requestChannel + "' within timeout: " + sendTimeout);
                }
            }
            return Mono.fromFuture((CompletableFuture)replyChannel.messageFuture).doOnSubscribe(s -> {
                if (!error && this.countsEnabled) {
                    this.messageCount.incrementAndGet();
                }
            }).map(replyMessage -> MessageBuilder.fromMessage((Message)replyMessage).setHeader("replyChannel", originalReplyChannelHeader).setHeader("errorChannel", originalErrorChannelHeader).build()).onErrorResume(t -> error ? Mono.error((Throwable)t) : this.handleSendError(requestMessage, (Throwable)t));
        });
    }

    private Mono<Message<?>> handleSendError(Message<?> requestMessage, Throwable exception) {
        MessageChannel errorChannel;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("failure occurred in gateway sendAndReceiveReactive: " + exception.getMessage()));
        }
        if ((errorChannel = this.getErrorChannel()) != null) {
            ErrorMessage errorMessage = this.buildErrorMessage(requestMessage, exception);
            try {
                return this.doSendAndReceiveMessageReactive(errorChannel, errorMessage, true);
            }
            catch (Exception errorFlowFailure) {
                throw new MessagingException((Message)errorMessage, "failure occurred in error-handling flow", (Throwable)errorFlowFailure);
            }
        }
        throw this.wrapExceptionIfNecessary(exception, "gateway received checked Exception");
    }

    private long sendTimeout(Message<?> requestMessage) {
        Long sendTimeout = this.headerToLong(requestMessage.getHeaders().get((Object)this.messagingTemplate.getSendTimeoutHeader()));
        return sendTimeout != null ? sendTimeout.longValue() : this.messagingTemplate.getSendTimeout();
    }

    private long receiveTimeout(Message<?> requestMessage) {
        Long receiveTimeout = this.headerToLong(requestMessage.getHeaders().get((Object)this.messagingTemplate.getReceiveTimeoutHeader()));
        return receiveTimeout != null ? receiveTimeout.longValue() : this.messagingTemplate.getReceiveTimeout();
    }

    @Nullable
    private Long headerToLong(@Nullable Object headerValue) {
        if (headerValue instanceof Number) {
            return ((Number)headerValue).longValue();
        }
        if (headerValue instanceof String) {
            return Long.parseLong((String)headerValue);
        }
        return null;
    }

    protected final ErrorMessage buildErrorMessage(Message<?> requestMessage, Throwable throwable) {
        return this.errorMessageStrategy.buildErrorMessage(throwable, this.getErrorMessageAttributes(requestMessage));
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        return ErrorMessageUtils.getAttributeAccessor(message, null);
    }

    private void rethrow(Throwable t, String description) {
        throw this.wrapExceptionIfNecessary(t, description);
    }

    private RuntimeException wrapExceptionIfNecessary(Throwable t, String description) {
        if (t instanceof RuntimeException) {
            return (RuntimeException)t;
        }
        return new MessagingException(description, t);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void registerReplyMessageCorrelatorIfNecessary() {
        MessageChannel replyChannel = this.getReplyChannel();
        if (replyChannel != null && this.replyMessageCorrelator == null) {
            boolean shouldStartCorrelator;
            Object object = this.replyMessageCorrelatorMonitor;
            synchronized (object) {
                AbstractEndpoint correlator;
                if (this.replyMessageCorrelator != null) {
                    return;
                }
                BridgeHandler handler = new BridgeHandler();
                if (this.getBeanFactory() != null) {
                    handler.setBeanFactory(this.getBeanFactory());
                }
                handler.afterPropertiesSet();
                if (replyChannel instanceof SubscribableChannel) {
                    correlator = new EventDrivenConsumer((SubscribableChannel)replyChannel, handler);
                } else if (replyChannel instanceof PollableChannel) {
                    PollingConsumer endpoint = new PollingConsumer((PollableChannel)replyChannel, handler);
                    endpoint.setBeanFactory(this.getBeanFactory());
                    endpoint.setReceiveTimeout(this.replyTimeout);
                    endpoint.afterPropertiesSet();
                    correlator = endpoint;
                } else if (replyChannel instanceof ReactiveStreamsSubscribableChannel) {
                    ReactiveStreamsConsumer endpoint = new ReactiveStreamsConsumer(replyChannel, (Subscriber<Message<?>>)handler);
                    endpoint.afterPropertiesSet();
                    correlator = endpoint;
                } else {
                    throw new MessagingException("Unsupported 'replyChannel' type [" + replyChannel.getClass() + "].SubscribableChannel or PollableChannel type are supported.");
                }
                this.replyMessageCorrelator = correlator;
                shouldStartCorrelator = true;
            }
            if (shouldStartCorrelator && this.isRunning() && this.isRunning()) {
                this.replyMessageCorrelator.start();
            }
        }
    }

    @Override
    protected void doStart() {
        if (this.replyMessageCorrelator != null) {
            this.replyMessageCorrelator.start();
        }
    }

    @Override
    protected void doStop() {
        if (this.replyMessageCorrelator != null) {
            this.replyMessageCorrelator.stop();
        }
    }

    @Override
    public void reset() {
        this.messageCount.set(0L);
    }

    private static class FutureReplyChannel
    implements MessageChannel {
        private final CompletableFuture<Message<?>> messageFuture = new CompletableFuture();

        private FutureReplyChannel() {
        }

        public boolean send(Message<?> message, long timeout) {
            return this.messageFuture.complete(message);
        }
    }

    private static class DefaultRequestMapper
    implements InboundMessageMapper<Object> {
        private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

        DefaultRequestMapper() {
        }

        void setMessageBuilderFactory(MessageBuilderFactory messageBuilderFactory) {
            this.messageBuilderFactory = messageBuilderFactory;
        }

        @Override
        public Message<?> toMessage(Object object, @Nullable Map<String, Object> headers) throws Exception {
            if (object instanceof Message) {
                return (Message)object;
            }
            return object != null ? this.messageBuilderFactory.withPayload(object).copyHeadersIfAbsent(headers).build() : null;
        }
    }
}

