package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-1.1.2.RELEASE.jar:org/springframework/kafka/listener/ConcurrentMessageListenerContainer.class */
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    private final ConsumerFactory<K, V> consumerFactory;
    private final List<KafkaMessageListenerContainer<K, V>> containers;
    private int concurrency;

    public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        super(containerProperties);
        this.containers = new ArrayList();
        this.concurrency = 1;
        Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
        this.consumerFactory = consumerFactory;
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public void setConcurrency(int i) {
        Assert.isTrue(i > 0, "concurrency must be greater than 0");
        this.concurrency = i;
    }

    public List<KafkaMessageListenerContainer<K, V>> getContainers() {
        return Collections.unmodifiableList(this.containers);
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();
        TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions != null && this.concurrency > topicPartitions.length) {
            this.logger.warn("When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length);
            this.concurrency = topicPartitions.length;
        }
        setRunning(true);
        for (int i = 0; i < this.concurrency; i++) {
            KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer = topicPartitions == null ? new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties) : new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties, partitionSubset(containerProperties, i));
            if (getBeanName() != null) {
                kafkaMessageListenerContainer.setBeanName(getBeanName() + "-" + i);
            }
            if (getApplicationEventPublisher() != null) {
                kafkaMessageListenerContainer.setApplicationEventPublisher(getApplicationEventPublisher());
            }
            kafkaMessageListenerContainer.start();
            this.containers.add(kafkaMessageListenerContainer);
        }
    }

    private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
        TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (this.concurrency == 1) {
            return topicPartitions;
        }
        int length = topicPartitions.length;
        if (length == this.concurrency) {
            return new TopicPartitionInitialOffset[]{topicPartitions[i]};
        }
        int i2 = length / this.concurrency;
        return i == this.concurrency - 1 ? (TopicPartitionInitialOffset[]) Arrays.copyOfRange(topicPartitions, i * i2, topicPartitions.length) : (TopicPartitionInitialOffset[]) Arrays.copyOfRange(topicPartitions, i * i2, (i + 1) * i2);
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStop(final Runnable runnable) {
        final AtomicInteger atomicInteger = new AtomicInteger();
        if (isRunning()) {
            setRunning(false);
            Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
            while (it.hasNext()) {
                if (it.next().isRunning()) {
                    atomicInteger.incrementAndGet();
                }
            }
            for (KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer : this.containers) {
                if (kafkaMessageListenerContainer.isRunning()) {
                    kafkaMessageListenerContainer.stop(new Runnable() { // from class: org.springframework.kafka.listener.ConcurrentMessageListenerContainer.1
                        @Override // java.lang.Runnable
                        public void run() {
                            if (atomicInteger.decrementAndGet() <= 0) {
                                runnable.run();
                            }
                        }
                    });
                }
            }
            this.containers.clear();
        }
    }

    public String toString() {
        return "ConcurrentMessageListenerContainer [concurrency=" + this.concurrency + ", beanName=" + getBeanName() + ", running=" + isRunning() + "]";
    }
}
