package org.springframework.cloud.stream.binder;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.eclipse.jgit.lib.ConfigConstants;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
import org.springframework.context.Lifecycle;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.support.AckUtils;
import org.springframework.integration.support.AcknowledgmentCallback;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.StaticMessageHeaderAccessor;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-2.0.0.RELEASE.jar:org/springframework/cloud/stream/binder/DefaultPollableMessageSource.class */
public class DefaultPollableMessageSource implements PollableMessageSource, Lifecycle, RetryListener {
    protected static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();
    private final SmartMessageConverter messageConverter;
    private MessageSource<?> source;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<Object> recoveryCallback;
    private MessageChannel errorChannel;
    private BiConsumer<AttributeAccessor, Message<?>> attributesProvider;
    private boolean running;
    private final List<ChannelInterceptor> interceptors = new ArrayList();
    private final MessagingTemplate messagingTemplate = new MessagingTemplate();
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();

    public DefaultPollableMessageSource(@Nullable SmartMessageConverter smartMessageConverter) {
        this.messageConverter = smartMessageConverter;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.aopalliance.aop.Advice, org.springframework.cloud.stream.binder.DefaultPollableMessageSource$1ReceiveAdvice] */
    public void setSource(MessageSource<?> messageSource) {
        ProxyFactory proxyFactory = new ProxyFactory(messageSource);
        ?? r0 = new MethodInterceptor() { // from class: org.springframework.cloud.stream.binder.DefaultPollableMessageSource.1ReceiveAdvice
            private final List<ChannelInterceptor> interceptors = new ArrayList();

            @Override // org.aopalliance.intercept.MethodInterceptor
            public Object invoke(MethodInvocation methodInvocation) throws Throwable {
                Object proceed = methodInvocation.proceed();
                if (!(proceed instanceof Message)) {
                    return proceed;
                }
                Message<?> message = (Message) proceed;
                Iterator<ChannelInterceptor> it = this.interceptors.iterator();
                while (it.hasNext()) {
                    message = it.next().preSend(message, null);
                    if (message == null) {
                        return null;
                    }
                }
                return message;
            }
        };
        ((C1ReceiveAdvice) r0).interceptors.addAll(this.interceptors);
        NameMatchMethodPointcutAdvisor nameMatchMethodPointcutAdvisor = new NameMatchMethodPointcutAdvisor(r0);
        nameMatchMethodPointcutAdvisor.addMethodName(ConfigConstants.CONFIG_RECEIVE_SECTION);
        proxyFactory.addAdvisor(nameMatchMethodPointcutAdvisor);
        this.source = (MessageSource) proxyFactory.getProxy();
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        retryTemplate.registerListener(this);
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setErrorChannel(MessageChannel messageChannel) {
        this.errorChannel = messageChannel;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public void setAttributesProvider(BiConsumer<AttributeAccessor, Message<?>> biConsumer) {
        this.attributesProvider = biConsumer;
    }

    public void addInterceptor(ChannelInterceptor channelInterceptor) {
        this.interceptors.add(channelInterceptor);
    }

    public void addInterceptor(int i, ChannelInterceptor channelInterceptor) {
        this.interceptors.add(i, channelInterceptor);
    }

    @Override // org.springframework.context.Lifecycle
    public synchronized boolean isRunning() {
        return this.running;
    }

    @Override // org.springframework.context.Lifecycle
    public synchronized void start() {
        if (!this.running && (this.source instanceof Lifecycle)) {
            ((Lifecycle) this.source).start();
        }
        this.running = true;
    }

    @Override // org.springframework.context.Lifecycle
    public synchronized void stop() {
        if (this.running && (this.source instanceof Lifecycle)) {
            ((Lifecycle) this.source).stop();
        }
        this.running = false;
    }

    @Override // org.springframework.cloud.stream.binder.PollableSource
    public boolean poll(MessageHandler messageHandler) {
        return poll2(messageHandler, (ParameterizedTypeReference<?>) null);
    }

    /* renamed from: poll, reason: avoid collision after fix types in other method */
    public boolean poll2(MessageHandler messageHandler, ParameterizedTypeReference<?> parameterizedTypeReference) {
        Message<?> receive = receive(parameterizedTypeReference);
        if (receive == null) {
            return false;
        }
        AcknowledgmentCallback acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(receive);
        try {
            try {
                if (this.retryTemplate != null) {
                    this.retryTemplate.execute(retryContext -> {
                        handle(receive, messageHandler);
                        return null;
                    }, this.recoveryCallback);
                } else if (this.errorChannel == null) {
                    handle(receive, messageHandler);
                } else {
                    try {
                        handle(receive, messageHandler);
                    } catch (Exception e) {
                        this.messagingTemplate.send((MessagingTemplate) this.errorChannel, (Message<?>) this.errorMessageStrategy.buildErrorMessage(e, attributesHolder.get()));
                    }
                }
                return true;
            } catch (Exception e2) {
                AckUtils.autoNack(acknowledgmentCallback);
                if ((e2 instanceof MessageHandlingException) && ((MessageHandlingException) e2).getFailedMessage().equals(receive)) {
                    throw ((MessageHandlingException) e2);
                }
                throw new MessageHandlingException(receive, e2);
            }
        } finally {
            AckUtils.autoAck(acknowledgmentCallback);
        }
    }

    @Override // org.springframework.retry.RetryListener
    public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
        if (this.recoveryCallback == null) {
            return true;
        }
        attributesHolder.set(retryContext);
        return true;
    }

    @Override // org.springframework.retry.RetryListener
    public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
        attributesHolder.remove();
    }

    @Override // org.springframework.retry.RetryListener
    public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
    }

    private Message<?> receive(ParameterizedTypeReference<?> parameterizedTypeReference) {
        Message<?> receive = this.source.receive();
        if (receive != null && parameterizedTypeReference != null && this.messageConverter != null) {
            Object fromMessage = this.messageConverter.fromMessage(receive, parameterizedTypeReference == null ? Object.class : parameterizedTypeReference.getType() instanceof Class ? (Class) parameterizedTypeReference.getType() : Object.class, parameterizedTypeReference);
            if (fromMessage == null) {
                throw new MessageConversionException(receive, "No converter could convert Message");
            }
            receive = MessageBuilder.withPayload(fromMessage).copyHeaders(receive.getHeaders()).build();
        }
        return receive;
    }

    private void doHandleMessage(MessageHandler messageHandler, Message<?> message) {
        try {
            messageHandler.handleMessage(message);
        } catch (Throwable th) {
            throw new MessageHandlingException(message, th);
        }
    }

    private void setAttributesIfNecessary(Message<?> message) {
        AttributeAccessor attributeAccessor;
        boolean z = this.errorChannel != null && this.retryTemplate == null;
        boolean z2 = z || this.retryTemplate != null;
        if (z) {
            attributesHolder.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (!z2 || (attributeAccessor = attributesHolder.get()) == null) {
            return;
        }
        attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
        if (this.attributesProvider != null) {
            this.attributesProvider.accept(attributeAccessor, message);
        }
    }

    private void handle(Message<?> message, MessageHandler messageHandler) {
        setAttributesIfNecessary(message);
        doHandleMessage(messageHandler, message);
    }

    @Override // org.springframework.cloud.stream.binder.PollableSource
    public /* bridge */ /* synthetic */ boolean poll(MessageHandler messageHandler, ParameterizedTypeReference parameterizedTypeReference) {
        return poll2(messageHandler, (ParameterizedTypeReference<?>) parameterizedTypeReference);
    }
}
