package org.apache.flink.table.api.bridge.java.internal;

import java.net.URL;
import java.util.Arrays;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.SchemaTranslator;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.class */
public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnvironmentImpl implements StreamTableEnvironment {
    public StreamTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, ResourceManager resourceManager, FunctionCatalog functionCatalog, TableConfig tableConfig, StreamExecutionEnvironment streamExecutionEnvironment, Planner planner, Executor executor, boolean z) {
        super(catalogManager, moduleManager, resourceManager, tableConfig, executor, functionCatalog, planner, z, streamExecutionEnvironment);
    }

    public static StreamTableEnvironment create(StreamExecutionEnvironment streamExecutionEnvironment, EnvironmentSettings environmentSettings) {
        ClassLoader create = FlinkUserCodeClassLoaders.create(new URL[0], environmentSettings.getUserClassLoader(), environmentSettings.getConfiguration());
        Executor lookupExecutor = lookupExecutor(create, streamExecutionEnvironment);
        TableConfig tableConfig = TableConfig.getDefault();
        tableConfig.setRootConfiguration(lookupExecutor.getConfiguration());
        tableConfig.addConfiguration(environmentSettings.getConfiguration());
        ResourceManager resourceManager = new ResourceManager(environmentSettings.getConfiguration(), create);
        ModuleManager moduleManager = new ModuleManager();
        CatalogManager build = CatalogManager.newBuilder().classLoader(create).config(tableConfig).defaultCatalog(environmentSettings.getBuiltInCatalogName(), new GenericInMemoryCatalog(environmentSettings.getBuiltInCatalogName(), environmentSettings.getBuiltInDatabaseName())).executionConfig(streamExecutionEnvironment.getConfig()).build();
        FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, resourceManager, build, moduleManager);
        return new StreamTableEnvironmentImpl(build, moduleManager, resourceManager, functionCatalog, tableConfig, streamExecutionEnvironment, PlannerFactoryUtil.createPlanner(lookupExecutor, tableConfig, create, moduleManager, build, functionCatalog), lookupExecutor, environmentSettings.isStreamingMode());
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> void registerFunction(String str, TableFunction<T> tableFunction) {
        this.functionCatalog.registerTempSystemTableFunction(str, tableFunction, UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T, ACC> void registerFunction(String str, AggregateFunction<T, ACC> aggregateFunction) {
        this.functionCatalog.registerTempSystemAggregateFunction(str, aggregateFunction, UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction), UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T, ACC> void registerFunction(String str, TableAggregateFunction<T, ACC> tableAggregateFunction) {
        this.functionCatalog.registerTempSystemAggregateFunction(str, tableAggregateFunction, UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction), UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> Table fromDataStream(DataStream<T> dataStream) {
        return fromStreamInternal(dataStream, null, null, ChangelogMode.insertOnly());
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema) {
        Preconditions.checkNotNull(schema, "Schema must not be null.");
        return fromStreamInternal(dataStream, schema, null, ChangelogMode.insertOnly());
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public Table fromChangelogStream(DataStream<Row> dataStream) {
        return fromStreamInternal(dataStream, null, null, ChangelogMode.all());
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public Table fromChangelogStream(DataStream<Row> dataStream, Schema schema) {
        Preconditions.checkNotNull(schema, "Schema must not be null.");
        return fromStreamInternal(dataStream, schema, null, ChangelogMode.all());
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode) {
        Preconditions.checkNotNull(schema, "Schema must not be null.");
        return fromStreamInternal(dataStream, schema, null, changelogMode);
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> void createTemporaryView(String str, DataStream<T> dataStream) {
        createTemporaryView(str, fromStreamInternal(dataStream, null, str, ChangelogMode.insertOnly()));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> void createTemporaryView(String str, DataStream<T> dataStream, Schema schema) {
        createTemporaryView(str, fromStreamInternal(dataStream, schema, str, ChangelogMode.insertOnly()));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public DataStream<Row> toDataStream(Table table) {
        Preconditions.checkNotNull(table, "Table must not be null.");
        return toDataStream(table, table.getResolvedSchema().toSourceRowDataType());
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> DataStream<T> toDataStream(Table table, Class<T> cls) {
        Preconditions.checkNotNull(table, "Table must not be null.");
        Preconditions.checkNotNull(cls, "Target class must not be null.");
        return cls == Row.class ? (DataStream<T>) toDataStream(table) : toDataStream(table, DataTypes.of((Class<?>) cls));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> DataStream<T> toDataStream(Table table, AbstractDataType<?> abstractDataType) {
        Preconditions.checkNotNull(table, "Table must not be null.");
        Preconditions.checkNotNull(abstractDataType, "Target data type must not be null.");
        return toStreamInternal(table, SchemaTranslator.createProducingResult(getCatalogManager().getDataTypeFactory(), table.getResolvedSchema(), abstractDataType), ChangelogMode.insertOnly());
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public DataStream<Row> toChangelogStream(Table table) {
        Preconditions.checkNotNull(table, "Table must not be null.");
        return toStreamInternal(table, SchemaTranslator.createProducingResult(table.getResolvedSchema(), null), null);
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public DataStream<Row> toChangelogStream(Table table, Schema schema) {
        Preconditions.checkNotNull(table, "Table must not be null.");
        Preconditions.checkNotNull(schema, "Target schema must not be null.");
        return toStreamInternal(table, SchemaTranslator.createProducingResult(table.getResolvedSchema(), schema), null);
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public DataStream<Row> toChangelogStream(Table table, Schema schema, ChangelogMode changelogMode) {
        Preconditions.checkNotNull(table, "Table must not be null.");
        Preconditions.checkNotNull(schema, "Target schema must not be null.");
        Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null.");
        return toStreamInternal(table, SchemaTranslator.createProducingResult(table.getResolvedSchema(), schema), changelogMode);
    }

    @Override // org.apache.flink.table.api.internal.TableEnvironmentImpl, org.apache.flink.table.api.TableEnvironment
    public StreamStatementSet createStatementSet() {
        return new StreamStatementSetImpl(this);
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> Table fromDataStream(DataStream<T> dataStream, Expression... expressionArr) {
        return createTable(asQueryOperation(dataStream, Optional.of(Arrays.asList(expressionArr))));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> void registerDataStream(String str, DataStream<T> dataStream) {
        createTemporaryView(str, dataStream);
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> void createTemporaryView(String str, DataStream<T> dataStream, Expression... expressionArr) {
        createTemporaryView(str, fromDataStream(dataStream, expressionArr));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> DataStream<T> toAppendStream(Table table, Class<T> cls) {
        return toAppendStream(table, extractTypeInformation(table, cls));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInformation) {
        return toStreamInternal(table, new OutputConversionModifyOperation(table.getQueryOperation(), TypeConversions.fromLegacyInfoToDataType((TypeInformation<?>) typeInformation), OutputConversionModifyOperation.UpdateMode.APPEND));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> cls) {
        return toRetractStream(table, extractTypeInformation(table, cls));
    }

    @Override // org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInformation) {
        return toStreamInternal(table, new OutputConversionModifyOperation(table.getQueryOperation(), wrapWithChangeFlag(typeInformation), OutputConversionModifyOperation.UpdateMode.RETRACT));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.api.internal.TableEnvironmentImpl
    public void validateTableSource(TableSource<?> tableSource) {
        super.validateTableSource(tableSource);
        validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource));
    }
}
