-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-36540][Runtime] Add Support for Hadoop Caller Context when using Flink to operate hdfs. #26681
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@dmvk @xintongsong @ferenc-csaky |
<td><h5>hdfs.caller-context.enabled</h5></td> | ||
<td style="word-wrap: break-word;">false</td> | ||
<td>Boolean</td> | ||
<td>A config of whether hadoop caller context is enabled.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be more readable to say:
Whether hadoop caller context is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forget to push my latest branch, sorry about that.
…ng Flink to operate hdfs.
…ng Flink to operate hdfs.
* used, such as caller context or other metadata. | ||
*/ | ||
@Experimental | ||
public interface ContextWrapperFileSystem { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need the words wrap and wrapper. It would simpler (/more intuitive?) to have ContextFileSystem and the method as addContext. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
CONTEXTS.set(newContext); | ||
} | ||
|
||
static FileSystem wrapWithContextWhenActivated(FileSystem fs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does WhenActivivated mean here? Maybe explain in comments if this is important. Otherwise could we not say addContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
context = context + "_local"; | ||
} | ||
context = context + "JobID_" + jobID; | ||
FileSystemContext.initializeContextForThread(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking some file systems would have contexts and some would not. The code does context processing when the file system might not have a context. Have I understood this correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, currently only hadoopFilesystem use this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add unit tests
@@ -115,6 +116,21 @@ public void run() { | |||
checkpointMetaData.getCheckpointId(), | |||
asyncStartDelayMillis); | |||
|
|||
String context = "FLINK"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we put the into an util class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
} else { | ||
context = context + "_local"; | ||
} | ||
context = context + "JobID_" + taskEnvironment.getJobID() + "_TaskName_" + taskName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compare to use JobID only, it will be better to use both job name and job id for readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree with this. Job names may contain special characters such as spaces.
In my case, I want to load this context into a structured table for further analysis, so I believe the job ID is sufficient.
If we need to find the exact job name, we can always look it up in the History Server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contrition. We are also waiting for the feature.
@flinkbot run azure |
Does the CI failure related to this change? If not, let's rebase it to the latest master. |
thanks for your reply @ferenc-csaky |
What is the purpose of the change
As described in FLINK-36540.
When we use Flink to delete or write or modify files on Hadoop filesystem, callerContext is a helpful feature if we want to trace who did the operation or count how many files an application can create on hadoop filesystem. UGI is not good enough to trace these operations because if we have a tenant who has a lot of jobs writing into HDFS, we cannot find out which job caused the breakdown of HDFS.
I created a new interface and class in flink-core module, so that it will not cause the leak in ThreadLocal value, and it won't influence the situation if we do not use hdfs.
What's more, with this new feature and history json files in history server, we can calculate how many read operations and write operations a Flink application did to hdfs, and find out if there is a pressure or bottleneck to operate on hdfs files.
Brief change log
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
(example:)
I rebuild this project, and test the new jar file in my cluster, it prints out the correct caller context as expected

Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation