package org.springframework.kafka.core;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.Lifecycle;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/kafka/core/DefaultKafkaProducerFactory.class */
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
    private static final int DEFAULT_PHYSICAL_CLOSE_TIMEOUT = 30;
    private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class);
    private final Map<String, Object> configs;
    private final AtomicInteger transactionIdSuffix;
    private final BlockingQueue<CloseSafeProducer<K, V>> cache;
    private final Map<String, CloseSafeProducer<K, V>> consumerProducers;
    private final AtomicInteger clientIdCounter;
    private volatile CloseSafeProducer<K, V> producer;
    private Serializer<K> keySerializer;
    private Serializer<V> valueSerializer;
    private int physicalCloseTimeout;
    private String transactionIdPrefix;
    private volatile boolean running;
    private boolean producerPerConsumerPartition;
    private String clientIdPrefix;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/springframework/kafka/core/DefaultKafkaProducerFactory$CloseSafeProducer.class */
    public static class CloseSafeProducer<K, V> implements Producer<K, V> {
        private final Producer<K, V> delegate;
        private final BlockingQueue<CloseSafeProducer<K, V>> cache;
        private final String txId;
        private final Remover<K, V> remover;
        private final int closeTimeout;
        private volatile Exception producerFailed;
        private volatile boolean closed;

        CloseSafeProducer(Producer<K, V> producer, Remover<K, V> remover, int i) {
            this(producer, null, remover, null, i);
            Assert.isTrue(!(producer instanceof CloseSafeProducer), "Cannot double-wrap a producer");
        }

        CloseSafeProducer(Producer<K, V> producer, BlockingQueue<CloseSafeProducer<K, V>> blockingQueue, int i) {
            this(producer, blockingQueue, null, i);
        }

        CloseSafeProducer(Producer<K, V> producer, BlockingQueue<CloseSafeProducer<K, V>> blockingQueue, Remover<K, V> remover, int i) {
            this(producer, blockingQueue, remover, null, i);
        }

        CloseSafeProducer(Producer<K, V> producer, BlockingQueue<CloseSafeProducer<K, V>> blockingQueue, Remover<K, V> remover, String str, int i) {
            this.delegate = producer;
            this.cache = blockingQueue;
            this.remover = remover;
            this.txId = str;
            this.closeTimeout = i;
        }

        Producer<K, V> getDelegate() {
            return this.delegate;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
            return this.delegate.send(producerRecord);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, final Callback callback) {
            return this.delegate.send(producerRecord, new Callback() { // from class: org.springframework.kafka.core.DefaultKafkaProducerFactory.CloseSafeProducer.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc instanceof OutOfOrderSequenceException) {
                        CloseSafeProducer.this.producerFailed = exc;
                        CloseSafeProducer.this.close(CloseSafeProducer.this.closeTimeout, TimeUnit.MILLISECONDS);
                    }
                    callback.onCompletion(recordMetadata, exc);
                }
            });
        }

        public void flush() {
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String str) {
            return this.delegate.partitionsFor(str);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            if (DefaultKafkaProducerFactory.logger.isDebugEnabled()) {
                DefaultKafkaProducerFactory.logger.debug("beginTransaction: " + this);
            }
            try {
                this.delegate.beginTransaction();
            } catch (RuntimeException e) {
                if (DefaultKafkaProducerFactory.logger.isErrorEnabled()) {
                    DefaultKafkaProducerFactory.logger.error("beginTransaction failed: " + this, e);
                }
                this.producerFailed = e;
                throw e;
            }
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) throws ProducerFencedException {
            this.delegate.sendOffsetsToTransaction(map, str);
        }

        public void commitTransaction() throws ProducerFencedException {
            if (DefaultKafkaProducerFactory.logger.isDebugEnabled()) {
                DefaultKafkaProducerFactory.logger.debug("commitTransaction: " + this);
            }
            try {
                this.delegate.commitTransaction();
            } catch (RuntimeException e) {
                if (DefaultKafkaProducerFactory.logger.isErrorEnabled()) {
                    DefaultKafkaProducerFactory.logger.error("commitTransaction failed: " + this, e);
                }
                this.producerFailed = e;
                throw e;
            }
        }

        public void abortTransaction() throws ProducerFencedException {
            if (DefaultKafkaProducerFactory.logger.isDebugEnabled()) {
                DefaultKafkaProducerFactory.logger.debug("abortTransaction: " + this);
            }
            try {
                this.delegate.abortTransaction();
            } catch (RuntimeException e) {
                if (DefaultKafkaProducerFactory.logger.isErrorEnabled()) {
                    DefaultKafkaProducerFactory.logger.error("Abort failed: " + this, e);
                }
                this.producerFailed = e;
                throw e;
            }
        }

        public void close() {
            close(0L, null);
        }

        public void close(long j, TimeUnit timeUnit) {
            if (this.closed) {
                return;
            }
            if (this.producerFailed != null) {
                if (DefaultKafkaProducerFactory.logger.isWarnEnabled()) {
                    DefaultKafkaProducerFactory.logger.warn("Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: " + this);
                }
                this.closed = true;
                this.delegate.close(((this.producerFailed instanceof TimeoutException) || timeUnit == null) ? 0L : j, timeUnit);
                if (this.remover != null) {
                    this.remover.remove(this);
                    return;
                }
                return;
            }
            if (this.cache == null || this.remover != null) {
                return;
            }
            synchronized (this) {
                if (!this.cache.contains(this) && !this.cache.offer(this)) {
                    this.closed = true;
                    this.delegate.close(this.closeTimeout, timeUnit);
                }
            }
        }

        public String toString() {
            return "CloseSafeProducer [delegate=" + this.delegate + "" + (this.txId != null ? ", txId=" + this.txId : "") + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/kafka/core/DefaultKafkaProducerFactory$Remover.class */
    public interface Remover<K, V> {
        void remove(CloseSafeProducer<K, V> closeSafeProducer);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> map) {
        this(map, null, null);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> map, Serializer<K> serializer, Serializer<V> serializer2) {
        this.transactionIdSuffix = new AtomicInteger();
        this.cache = new LinkedBlockingQueue();
        this.consumerProducers = new HashMap();
        this.clientIdCounter = new AtomicInteger();
        this.physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
        this.producerPerConsumerPartition = true;
        this.configs = new HashMap(map);
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
        if (map.get("client.id") instanceof String) {
            this.clientIdPrefix = (String) map.get("client.id");
        }
    }

    public void setKeySerializer(Serializer<K> serializer) {
        this.keySerializer = serializer;
    }

    public void setValueSerializer(Serializer<V> serializer) {
        this.valueSerializer = serializer;
    }

    public void setPhysicalCloseTimeout(int i) {
        this.physicalCloseTimeout = i;
    }

    public void setTransactionIdPrefix(String str) {
        Assert.notNull(str, "'transactionIdPrefix' cannot be null");
        this.transactionIdPrefix = str;
    }

    public void setProducerPerConsumerPartition(boolean z) {
        this.producerPerConsumerPartition = z;
    }

    public boolean isProducerPerConsumerPartition() {
        return this.producerPerConsumerPartition;
    }

    public Map<String, Object> getConfigurationProperties() {
        return Collections.unmodifiableMap(this.configs);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public boolean transactionCapable() {
        return this.transactionIdPrefix != null;
    }

    public void destroy() {
        CloseSafeProducer<K, V> closeSafeProducer;
        synchronized (this) {
            closeSafeProducer = this.producer;
            this.producer = null;
        }
        if (closeSafeProducer != null) {
            ((CloseSafeProducer) closeSafeProducer).delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
        }
        this.producer = this.cache.poll();
        while (this.producer != null) {
            try {
                ((CloseSafeProducer) closeSafeProducer).delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
            } catch (Exception e) {
                logger.error("Exception while closing producer", e);
            }
            this.producer = this.cache.poll();
        }
        synchronized (this.consumerProducers) {
            Iterator<Map.Entry<String, CloseSafeProducer<K, V>>> it = this.consumerProducers.entrySet().iterator();
            while (it.hasNext()) {
                ((CloseSafeProducer) it.next().getValue()).delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
            }
            this.consumerProducers.clear();
        }
    }

    public void start() {
        this.running = true;
    }

    public void stop() {
        try {
            destroy();
            this.running = false;
        } catch (Exception e) {
            logger.error("Exception while closing producer", e);
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public Producer<K, V> createProducer() {
        CloseSafeProducer<K, V> closeSafeProducer;
        if (this.transactionIdPrefix != null) {
            return this.producerPerConsumerPartition ? createTransactionalProducerForPartition() : createTransactionalProducer();
        }
        synchronized (this) {
            if (this.producer == null) {
                this.producer = new CloseSafeProducer<>(createKafkaProducer(), standardProducerRemover(), this.physicalCloseTimeout);
            }
            closeSafeProducer = this.producer;
        }
        return closeSafeProducer;
    }

    protected Producer<K, V> createKafkaProducer() {
        if (this.clientIdPrefix == null) {
            return new KafkaProducer(this.configs, this.keySerializer, this.valueSerializer);
        }
        HashMap hashMap = new HashMap(this.configs);
        hashMap.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        return new KafkaProducer(hashMap, this.keySerializer, this.valueSerializer);
    }

    Producer<K, V> createTransactionalProducerForPartition() {
        String transactionIdSuffix = TransactionSupport.getTransactionIdSuffix();
        if (transactionIdSuffix == null) {
            return createTransactionalProducer();
        }
        synchronized (this.consumerProducers) {
            if (this.consumerProducers.containsKey(transactionIdSuffix)) {
                return this.consumerProducers.get(transactionIdSuffix);
            }
            CloseSafeProducer<K, V> doCreateTxProducer = doCreateTxProducer(transactionIdSuffix, true);
            this.consumerProducers.put(transactionIdSuffix, doCreateTxProducer);
            return doCreateTxProducer;
        }
    }

    protected final synchronized void removeProducer(CloseSafeProducer<K, V> closeSafeProducer) {
        if (closeSafeProducer.equals(this.producer)) {
            this.producer = null;
        }
    }

    protected Producer<K, V> createTransactionalProducer() {
        CloseSafeProducer<K, V> poll = this.cache.poll();
        return poll == null ? doCreateTxProducer("" + this.transactionIdSuffix.getAndIncrement(), false) : poll;
    }

    private CloseSafeProducer<K, V> doCreateTxProducer(String str, boolean z) {
        HashMap hashMap = new HashMap(this.configs);
        hashMap.put("transactional.id", this.transactionIdPrefix + str);
        if (this.clientIdPrefix != null) {
            hashMap.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        }
        KafkaProducer kafkaProducer = new KafkaProducer(hashMap, this.keySerializer, this.valueSerializer);
        kafkaProducer.initTransactions();
        return new CloseSafeProducer<>(kafkaProducer, this.cache, z ? consumerProducerRemover() : null, (String) hashMap.get("transactional.id"), this.physicalCloseTimeout);
    }

    private Remover<K, V> standardProducerRemover() {
        return new Remover<K, V>() { // from class: org.springframework.kafka.core.DefaultKafkaProducerFactory.1
            @Override // org.springframework.kafka.core.DefaultKafkaProducerFactory.Remover
            public void remove(CloseSafeProducer<K, V> closeSafeProducer) {
                DefaultKafkaProducerFactory.this.removeProducer(closeSafeProducer);
            }
        };
    }

    private Remover<K, V> consumerProducerRemover() {
        return new Remover<K, V>() { // from class: org.springframework.kafka.core.DefaultKafkaProducerFactory.2
            @Override // org.springframework.kafka.core.DefaultKafkaProducerFactory.Remover
            public void remove(CloseSafeProducer<K, V> closeSafeProducer) {
                DefaultKafkaProducerFactory.this.removeConsumerProducer(closeSafeProducer);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeConsumerProducer(CloseSafeProducer<K, V> closeSafeProducer) {
        synchronized (this.consumerProducers) {
            Iterator<Map.Entry<String, CloseSafeProducer<K, V>>> it = this.consumerProducers.entrySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (it.next().getValue().equals(closeSafeProducer)) {
                    it.remove();
                    break;
                }
            }
        }
    }

    protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
        return this.cache;
    }

    public void closeProducerFor(String str) {
        if (this.producerPerConsumerPartition) {
            synchronized (this.consumerProducers) {
                CloseSafeProducer<K, V> remove = this.consumerProducers.remove(str);
                if (remove != null) {
                    ((CloseSafeProducer) remove).delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
                }
            }
        }
    }
}
