Skip to content

Conversation

@lihaosky
Copy link
Contributor

@lihaosky lihaosky commented Oct 13, 2025

What is the purpose of the change

Add table api support for Model and ml_predict function in https://cwiki.apache.org/confluence/display/FLINK/FLIP-526%3A+Model+ML_PREDICT%2C+ML_EVALUATE+Table+API

Brief change log

  • Add new Model interface and ModelImpl implementation for model and ml_predict
  • Add fromModelPath and from to construct Model from TableEnvironment
  • Add ModelReferenceExpression and handle it in QueryOperationConverter
  • Add anonymous model to ContextResolvedModel

Verifying this change

Unit and Integration test

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 13, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

*
* @return the context resolved model metadata.
*/
ContextResolvedModel getModel();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twalthr , this forces ContextResolvedModel to be PublicEvolving. I guess one way to avoid it is to remove this method from Model and just add it in ModelImpl

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 22, 2025
Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. I left some comments.

* runtime configuration options such as max-concurrent-operations, timeout, and execution mode
* settings.
*
* <p>Common runtime options include:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not easy for the community to maintain the javadoc about these options. How about we just link the MLPredictRuntimeConfigOptions.

// lit() is not serializable to sql.
if (options.isEmpty()) {
return tableEnvironment.fromCall(
"ML_PREDICT",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use org.apache.flink.table.functions.BuiltInFunctionDefinitions#ML_PREDICT.getName() instead?


@Override
public Table predict(Table table, ColumnList inputColumns, Map<String, String> options) {
// Use Expressions.map() instead of Expression.lit() to create a MAP literal since
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Expressions.lit() ?


private final String name;
private final ContextResolvedModel model;
private final TableEnvironment env;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need TableEnv here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I see FieldReferenceExpression doesn't have table environment. I don't see any need to include the table env. How about we relaxing the limit and add it back when we really need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


public abstract R visit(TableReferenceExpression tableReference);

public abstract R visit(ModelReferenceExpression modelReferenceExpression);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add else if branch in line 30?

new int[0]);
inputStack.add(relBuilder.build());
return tableArgCall;
} else if (resolvedArg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea to use instance of here. It's better we reuse ExpressionConverter to convert these expression. How about let ExpressionConverterextends ResolvedExpressionVisitor<RexNode>?

Copy link
Member

@fsk119 fsk119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments

throw new ValidationException("Anonymous models cannot be serialized.");
}

return "MODEL " + model.getIdentifier().asSerializableString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the name is lost after searilization. I am not sure whether we need to restore the object from the string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name are not used for TableReferenceExpression and literal argument name as well. These are not serialized as named argument call. I don't see this is restored from serialized string, looks mainly to convert it sql query looks


private final String name;
private final ContextResolvedModel model;
private final TableEnvironment env;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I see FieldReferenceExpression doesn't have table environment. I don't see any need to include the table env. How about we relaxing the limit and add it back when we really need it.

this.typeFactory = (FlinkTypeFactory) relBuilder.getRexBuilder().getTypeFactory();
this.dataTypeFactory =
unwrapContext(relBuilder.getCluster()).getCatalogManager().getDataTypeFactory();
this.inputStack = new java.util.ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.inputStack = new ArrayList<>();

}
final RexTableArgCall tableArgCall =
new RexTableArgCall(rowType, inputStack.size(), partitionKeys, new int[0]);
inputStack.add(relBuilder.build());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we maintain the stack and do the relBuilder.build() in the QueryOperationConverter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to do since inputStack depends on this call. Maybe that's why TableReferenceExpression was handled in QueryOperationConverter in the first place

"from",
"registerFunction",
"fromCall",
"fromModelPath",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a tickect about this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants