Skip to content

NIFI-15862: Moved Processors to using Virtual Threads with a Semaphor…#11164

Open
markap14 wants to merge 2 commits intoapache:mainfrom
markap14:virtual-threads
Open

NIFI-15862: Moved Processors to using Virtual Threads with a Semaphor…#11164
markap14 wants to merge 2 commits intoapache:mainfrom
markap14:virtual-threads

Conversation

@markap14
Copy link
Copy Markdown
Contributor

…e bounding how many tasks can be run at once

Summary

NIFI-00000

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

private void runOnce(final Connectable connectable, final ConnectableTask connectableTask, final Callable<Future<Void>> stopCallback) {
try {
try {
globalSemaphore.acquire();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use acquirePermitWithPolling?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of .acquire is fine here because there's no way to explicitly stop a Processor when it's triggered with RunOnce anyway so doing it with polling doesn't buy us anything.

Comment on lines +190 to +193
public void setMaxThreadCount(final int maxThreads) {
globalSemaphore.setMaxPermits(maxThreads);
logger.info("Global semaphore permits updated to {}", maxThreads);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incrementMaxThreadCount is synchronized but setMaxThreadCount is not. Feels like we could have an update race condition, no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good catch!

cronExpression = null;
}

lifecycleState.setScheduled(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a try/catch here? Something like try { ... } catch (Throwable) { lifecycleState.setScheduled(false); ...; throw; }. If there is a problem with the loop after, we would leave it as scheduled, no?

writeLock.lock();
try {
setMaxThreadCount(maxThreadCount, "Timer Driven", this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
virtualThreadSchedulingAgent.setMaxThreadCount(maxThreadCount);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we now use the maxThreadCount value in two different places: the FlowEngine thread pool and the semaphore for the VT agent. Should we have separate properties?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had this fixed locally but forgot to push the commit. Now, the timer-driven engine is used only for framework tasks, so I renamed it and just hardcoded it to 8 threads.


import static java.util.Objects.requireNonNull;

public class FlowController implements ReportingTaskProvider, FlowAnalysisRuleProvider, Authorizable, NodeTypeProvider {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC you didn't make any change on the shutdown path so we do not attempt to interrupt in-flight component virtual threads and the kill of the JVM will just kill threads. That seems wrong, no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will ensure that they are interrupted.

…e bounding how many tasks can be run at once
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants