package org.apache.iceberg.aws.dynamodb;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LockManagers;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.RequestLimitExceededException;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.TransactionConflictException;

/* loaded from: input_file:org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.class */
public class DynamoDbLockManager extends LockManagers.BaseLockManager {
    private static final String COL_LEASE_DURATION_MS = "leaseDurationMs";
    private static final int LOCK_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
    private static final int RELEASE_RETRY_ATTEMPTS_MAX = 5;
    private final Map<String, DynamoDbHeartbeat> heartbeats = Maps.newHashMap();
    private DynamoDbClient dynamo;
    private String lockTableName;
    private static final Logger LOG = LoggerFactory.getLogger(DynamoDbLockManager.class);
    private static final String COL_LOCK_ENTITY_ID = "entityId";
    private static final String COL_LOCK_OWNER_ID = "ownerId";
    private static final String CONDITION_LOCK_ID_MATCH = String.format("%s = :eid AND %s = :oid", COL_LOCK_ENTITY_ID, COL_LOCK_OWNER_ID);
    private static final String CONDITION_LOCK_ENTITY_NOT_EXIST = String.format("attribute_not_exists(%s)", COL_LOCK_ENTITY_ID);
    private static final String COL_VERSION = "version";
    private static final String CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH = String.format("attribute_not_exists(%s) OR (%s = :eid AND %s = :vid)", COL_LOCK_ENTITY_ID, COL_LOCK_ENTITY_ID, COL_VERSION);
    private static final List<KeySchemaElement> LOCK_TABLE_SCHEMA = Lists.newArrayList((KeySchemaElement) KeySchemaElement.builder().attributeName(COL_LOCK_ENTITY_ID).keyType(KeyType.HASH).build());
    private static final List<AttributeDefinition> LOCK_TABLE_COL_DEFINITIONS = Lists.newArrayList((AttributeDefinition) AttributeDefinition.builder().attributeName(COL_LOCK_ENTITY_ID).attributeType(ScalarAttributeType.S).build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/aws/dynamodb/DynamoDbLockManager$DynamoDbHeartbeat.class */
    public static class DynamoDbHeartbeat implements Runnable {
        private final DynamoDbClient dynamo;
        private final String lockTableName;
        private final long intervalMs;
        private final long timeoutMs;
        private final String entityId;
        private final String ownerId;
        private ScheduledFuture<?> future = null;

        DynamoDbHeartbeat(DynamoDbClient dynamoDbClient, String str, long j, long j2, String str2, String str3) {
            this.dynamo = dynamoDbClient;
            this.lockTableName = str;
            this.intervalMs = j;
            this.timeoutMs = j2;
            this.entityId = str2;
            this.ownerId = str3;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.dynamo.putItem((PutItemRequest) PutItemRequest.builder().tableName(this.lockTableName).item(DynamoDbLockManager.toNewItem(this.entityId, this.ownerId, this.timeoutMs)).conditionExpression(DynamoDbLockManager.CONDITION_LOCK_ID_MATCH).expressionAttributeValues(DynamoDbLockManager.toLockIdValues(this.entityId, this.ownerId)).build());
            } catch (RuntimeException e) {
                DynamoDbLockManager.LOG.error("Failed to heartbeat for entity: {}, owner: {}", new Object[]{this.entityId, this.ownerId, e});
            } catch (ConditionalCheckFailedException e2) {
                DynamoDbLockManager.LOG.error("Fail to heartbeat for entity: {}, owner: {} due to conditional check failure, unsafe concurrent commits might be going on", new Object[]{this.entityId, this.ownerId, e2});
            }
        }

        public String ownerId() {
            return this.ownerId;
        }

        public void schedule(ScheduledExecutorService scheduledExecutorService) {
            this.future = scheduledExecutorService.scheduleAtFixedRate(this, 0L, this.intervalMs, TimeUnit.MILLISECONDS);
        }

        public void cancel() {
            if (this.future != null) {
                this.future.cancel(false);
            }
        }
    }

    public DynamoDbLockManager() {
    }

    public DynamoDbLockManager(DynamoDbClient dynamoDbClient, String str) {
        super.initialize(Maps.newHashMap());
        this.dynamo = dynamoDbClient;
        this.lockTableName = str;
        ensureLockTableExistsOrCreate();
    }

    private void ensureLockTableExistsOrCreate() {
        if (tableExists(this.lockTableName)) {
            return;
        }
        LOG.info("Dynamo lock table {} not found, trying to create", this.lockTableName);
        this.dynamo.createTable((CreateTableRequest) CreateTableRequest.builder().tableName(this.lockTableName).keySchema(lockTableSchema()).attributeDefinitions(lockTableColDefinitions()).billingMode(BillingMode.PAY_PER_REQUEST).build());
        Tasks.foreach(this.lockTableName).retry(5).throwFailureWhenFinished().onlyRetryOn(IllegalStateException.class).run(this::checkTableActive);
    }

    @VisibleForTesting
    boolean tableExists(String str) {
        try {
            this.dynamo.describeTable((DescribeTableRequest) DescribeTableRequest.builder().tableName(str).build());
            return true;
        } catch (ResourceNotFoundException e) {
            return false;
        }
    }

    private void checkTableActive(String str) {
        try {
            TableStatus tableStatus = this.dynamo.describeTable((DescribeTableRequest) DescribeTableRequest.builder().tableName(str).build()).table().tableStatus();
            if (tableStatus.equals(TableStatus.ACTIVE)) {
            } else {
                throw new IllegalStateException(String.format("Dynamo table %s is not active, current status: %s", str, tableStatus));
            }
        } catch (ResourceNotFoundException e) {
            throw new IllegalStateException(String.format("Cannot find Dynamo table %s", str));
        }
    }

    @Override // org.apache.iceberg.util.LockManagers.BaseLockManager, org.apache.iceberg.LockManager
    public void initialize(Map<String, String> map) {
        super.initialize(map);
        this.dynamo = AwsClientFactories.from(map).dynamo();
        this.lockTableName = map.get(CatalogProperties.LOCK_TABLE);
        Preconditions.checkNotNull(this.lockTableName, "DynamoDB lock table name must not be null");
        ensureLockTableExistsOrCreate();
    }

    @Override // org.apache.iceberg.LockManager
    public boolean acquire(String str, String str2) {
        try {
            Tasks.foreach(str).throwFailureWhenFinished().retry(MetadataColumns.FILE_PATH_COLUMN_ID).exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1.0d).onlyRetryOn(ConditionalCheckFailedException.class, ProvisionedThroughputExceededException.class, TransactionConflictException.class, RequestLimitExceededException.class, InternalServerErrorException.class).run(str3 -> {
                acquireOnce(str3, str2);
            });
            return true;
        } catch (DynamoDbException e) {
            return false;
        }
    }

    @VisibleForTesting
    void acquireOnce(String str, String str2) {
        GetItemResponse item = this.dynamo.getItem((GetItemRequest) GetItemRequest.builder().tableName(this.lockTableName).consistentRead(true).key(toKey(str)).build());
        if (item.hasItem()) {
            Map item2 = item.item();
            try {
                Thread.sleep(Long.parseLong(((AttributeValue) item2.get(COL_LEASE_DURATION_MS)).n()));
                this.dynamo.putItem((PutItemRequest) PutItemRequest.builder().tableName(this.lockTableName).item(toNewItem(str, str2, heartbeatTimeoutMs())).conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH).expressionAttributeValues(ImmutableMap.of(":eid", (AttributeValue) AttributeValue.builder().s(str).build(), ":vid", (AttributeValue) AttributeValue.builder().s(((AttributeValue) item2.get(COL_VERSION)).s()).build())).build());
            } catch (InterruptedException e) {
                throw new IllegalStateException(String.format("Fail to acquire lock %s by %s, interrupted during sleep", str, str2), e);
            }
        } else {
            this.dynamo.putItem((PutItemRequest) PutItemRequest.builder().tableName(this.lockTableName).item(toNewItem(str, str2, heartbeatTimeoutMs())).conditionExpression(CONDITION_LOCK_ENTITY_NOT_EXIST).build());
        }
        startNewHeartbeat(str, str2);
    }

    private void startNewHeartbeat(String str, String str2) {
        if (this.heartbeats.containsKey(str)) {
            this.heartbeats.remove(str).cancel();
        }
        DynamoDbHeartbeat dynamoDbHeartbeat = new DynamoDbHeartbeat(this.dynamo, this.lockTableName, heartbeatIntervalMs(), heartbeatTimeoutMs(), str, str2);
        dynamoDbHeartbeat.schedule(scheduler());
        this.heartbeats.put(str, dynamoDbHeartbeat);
    }

    @Override // org.apache.iceberg.LockManager
    public boolean release(String str, String str2) {
        boolean z = false;
        DynamoDbHeartbeat dynamoDbHeartbeat = this.heartbeats.get(str);
        try {
            try {
                Tasks.foreach(str).retry(5).throwFailureWhenFinished().onlyRetryOn(ProvisionedThroughputExceededException.class, TransactionConflictException.class, RequestLimitExceededException.class, InternalServerErrorException.class).run(str3 -> {
                    this.dynamo.deleteItem((DeleteItemRequest) DeleteItemRequest.builder().tableName(this.lockTableName).key(toKey(str3)).conditionExpression(CONDITION_LOCK_ID_MATCH).expressionAttributeValues(toLockIdValues(str3, str2)).build());
                });
                z = true;
                if (dynamoDbHeartbeat != null && dynamoDbHeartbeat.ownerId().equals(str2)) {
                    dynamoDbHeartbeat.cancel();
                }
            } catch (DynamoDbException e) {
                LOG.error("Failed to release lock for entity: {}, owner: {}, encountered unexpected DynamoDB exception", new Object[]{str, str2, e});
                if (dynamoDbHeartbeat != null && dynamoDbHeartbeat.ownerId().equals(str2)) {
                    dynamoDbHeartbeat.cancel();
                }
            } catch (ConditionalCheckFailedException e2) {
                LOG.error("Failed to release lock for entity: {}, owner: {}, lock entity does not exist or owner not match", new Object[]{str, str2, e2});
                if (dynamoDbHeartbeat != null && dynamoDbHeartbeat.ownerId().equals(str2)) {
                    dynamoDbHeartbeat.cancel();
                }
            }
            return z;
        } catch (Throwable th) {
            if (dynamoDbHeartbeat != null && dynamoDbHeartbeat.ownerId().equals(str2)) {
                dynamoDbHeartbeat.cancel();
            }
            throw th;
        }
    }

    private static Map<String, AttributeValue> toKey(String str) {
        return ImmutableMap.of(COL_LOCK_ENTITY_ID, (AttributeValue) AttributeValue.builder().s(str).build());
    }

    private static Map<String, AttributeValue> toNewItem(String str, String str2, long j) {
        return ImmutableMap.of(COL_LOCK_ENTITY_ID, (AttributeValue) AttributeValue.builder().s(str).build(), COL_LOCK_OWNER_ID, (AttributeValue) AttributeValue.builder().s(str2).build(), COL_VERSION, (AttributeValue) AttributeValue.builder().s(UUID.randomUUID().toString()).build(), COL_LEASE_DURATION_MS, (AttributeValue) AttributeValue.builder().n(Long.toString(j)).build());
    }

    private static Map<String, AttributeValue> toLockIdValues(String str, String str2) {
        return ImmutableMap.of(":eid", (AttributeValue) AttributeValue.builder().s(str).build(), ":oid", (AttributeValue) AttributeValue.builder().s(str2).build());
    }

    @Override // org.apache.iceberg.util.LockManagers.BaseLockManager, java.lang.AutoCloseable
    public void close() throws Exception {
        this.dynamo.close();
        this.heartbeats.values().forEach((v0) -> {
            v0.cancel();
        });
        this.heartbeats.clear();
        super.close();
    }

    public static List<KeySchemaElement> lockTableSchema() {
        return LOCK_TABLE_SCHEMA;
    }

    public static List<AttributeDefinition> lockTableColDefinitions() {
        return LOCK_TABLE_COL_DEFINITIONS;
    }
}
