package ru.curs.melbet.kafka.configuration;

import java.beans.ConstructorProperties;
import java.time.Duration;
import javax.annotation.PostConstruct;
import lombok.Generated;
import org.apache.kafka.streams.KafkaStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.KafkaAdmin;
import ru.curs.melbet.kafka.metrics.KafkaSupportMetrics;
import ru.curs.melbet.kafka.utils.TopicChecker;
import ru.curs.melbet.metrics.EnableMetrics;

@Configuration
@EnableMetrics
/* loaded from: input_file:ru/curs/melbet/kafka/configuration/KafkaSupportAutoConfiguration.class */
public class KafkaSupportAutoConfiguration {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaSupportAutoConfiguration.class);
    private static final Duration CLOSE_KAFKA_STREAMS_TIMEOUT = Duration.ofSeconds(10);
    private final KafkaAdmin kafkaAdmin;
    private final RequiredTopics requiredTopics;

    /* loaded from: input_file:ru/curs/melbet/kafka/configuration/KafkaSupportAutoConfiguration$StreamsBuilderFactoryBeanConfigurer.class */
    static class StreamsBuilderFactoryBeanConfigurer implements InitializingBean {
        private final StreamsBuilderFactoryBean factoryBean;
        private final ApplicationContext ctx;
        private final KafkaSupportMetrics metrics;

        public void afterPropertiesSet() {
            this.factoryBean.setUncaughtExceptionHandler((thread, th) -> {
                KafkaSupportAutoConfiguration.log.error(String.format("Uncaught exception in the thread %s", thread.getName()), th);
                this.factoryBean.getKafkaStreams().close(KafkaSupportAutoConfiguration.CLOSE_KAFKA_STREAMS_TIMEOUT);
                KafkaSupportAutoConfiguration.log.info("Kafka streams closed.");
            });
            this.factoryBean.setStateListener((state, state2) -> {
                this.metrics.recordKafkaStreamsState(state);
                if (state == KafkaStreams.State.NOT_RUNNING || state == KafkaStreams.State.ERROR) {
                    KafkaSupportAutoConfiguration.log.info("Kafka streams entered in state {}. Now exiting the application.", state);
                    SpringApplication.exit(this.ctx, new ExitCodeGenerator[]{() -> {
                        return 1;
                    }});
                }
            });
        }

        @Generated
        @ConstructorProperties({"factoryBean", "ctx", "metrics"})
        public StreamsBuilderFactoryBeanConfigurer(StreamsBuilderFactoryBean streamsBuilderFactoryBean, ApplicationContext applicationContext, KafkaSupportMetrics kafkaSupportMetrics) {
            this.factoryBean = streamsBuilderFactoryBean;
            this.ctx = applicationContext;
            this.metrics = kafkaSupportMetrics;
        }
    }

    @PostConstruct
    public void init() throws InterruptedException {
        TopicChecker.ensureTopicsExist(this.requiredTopics, this.kafkaAdmin.getConfig());
    }

    @Bean
    KafkaSupportMetrics kafkaSupportMetrics() {
        return new KafkaSupportMetrics();
    }

    @ConditionalOnMissingBean
    @Bean
    StreamsBuilderFactoryBeanConfigurer streamBuilderConfigurer(@Qualifier("defaultKafkaStreamsBuilder") StreamsBuilderFactoryBean streamsBuilderFactoryBean, ApplicationContext applicationContext, KafkaSupportMetrics kafkaSupportMetrics) {
        return new StreamsBuilderFactoryBeanConfigurer(streamsBuilderFactoryBean, applicationContext, kafkaSupportMetrics);
    }

    @Generated
    @ConstructorProperties({"kafkaAdmin", "requiredTopics"})
    public KafkaSupportAutoConfiguration(KafkaAdmin kafkaAdmin, RequiredTopics requiredTopics) {
        this.kafkaAdmin = kafkaAdmin;
        this.requiredTopics = requiredTopics;
    }
}
