package org.apache.beam.sdk.io.gcp.spanner;

import com.google.cloud.spanner.AbortedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@VisibleForTesting
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.class */
public class SpannerWriteGroupFn extends AbstractSpannerFn<MutationGroup, Void> {
    private final SpannerIO.Write spec;
    private List<MutationGroup> mutations;
    private long batchSizeBytes = 0;
    private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class);
    private static final int MAX_RETRIES = 5;
    private static final FluentBackoff BUNDLE_WRITE_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public SpannerWriteGroupFn(SpannerIO.Write write) {
        this.spec = write;
    }

    @Override // org.apache.beam.sdk.io.gcp.spanner.AbstractSpannerFn
    SpannerConfig getSpannerConfig() {
        return this.spec.getSpannerConfig();
    }

    @Override // org.apache.beam.sdk.io.gcp.spanner.AbstractSpannerFn
    @DoFn.Setup
    public void setup() throws Exception {
        super.setup();
        this.mutations = new ArrayList();
        this.batchSizeBytes = 0L;
    }

    @DoFn.ProcessElement
    public void processElement(DoFn<MutationGroup, Void>.ProcessContext processContext) throws Exception {
        MutationGroup mutationGroup = (MutationGroup) processContext.element();
        this.mutations.add(mutationGroup);
        this.batchSizeBytes += MutationSizeEstimator.sizeOf(mutationGroup);
        if (this.batchSizeBytes >= this.spec.getBatchSizeBytes()) {
            flushBatch();
        }
    }

    @DoFn.FinishBundle
    public void finishBundle() throws Exception {
        if (this.mutations.isEmpty()) {
            return;
        }
        flushBatch();
    }

    private void flushBatch() throws AbortedException, IOException, InterruptedException {
        LOG.debug("Writing batch of {} mutations", Integer.valueOf(this.mutations.size()));
        Sleeper sleeper = Sleeper.DEFAULT;
        BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
        do {
            try {
                databaseClient().writeAtLeastOnce(Iterables.concat(this.mutations));
                LOG.debug("Successfully wrote {} mutations", Integer.valueOf(this.mutations.size()));
                this.mutations = new ArrayList();
                this.batchSizeBytes = 0L;
                return;
            } catch (AbortedException e) {
                LOG.error("Error writing to Spanner ({}): {}", Integer.valueOf(e.getCode()), e.getMessage());
            }
        } while (BackOffUtils.next(sleeper, backoff));
        LOG.error("Aborting after {} retries.", Integer.valueOf(MAX_RETRIES));
        throw e;
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        this.spec.populateDisplayData(builder);
    }
}
