NIFI-15862: Moved Processors to using Virtual Threads with a Semaphor…#11164
NIFI-15862: Moved Processors to using Virtual Threads with a Semaphor…#11164markap14 wants to merge 2 commits intoapache:mainfrom
Conversation
00c6167 to
c0cf8b2
Compare
| private void runOnce(final Connectable connectable, final ConnectableTask connectableTask, final Callable<Future<Void>> stopCallback) { | ||
| try { | ||
| try { | ||
| globalSemaphore.acquire(); |
There was a problem hiding this comment.
Should we use acquirePermitWithPolling?
There was a problem hiding this comment.
Use of .acquire is fine here because there's no way to explicitly stop a Processor when it's triggered with RunOnce anyway so doing it with polling doesn't buy us anything.
| public void setMaxThreadCount(final int maxThreads) { | ||
| globalSemaphore.setMaxPermits(maxThreads); | ||
| logger.info("Global semaphore permits updated to {}", maxThreads); | ||
| } |
There was a problem hiding this comment.
incrementMaxThreadCount is synchronized but setMaxThreadCount is not. Feels like we could have an update race condition, no?
There was a problem hiding this comment.
Yeah, that's a good catch!
| cronExpression = null; | ||
| } | ||
|
|
||
| lifecycleState.setScheduled(true); |
There was a problem hiding this comment.
Should we have a try/catch here? Something like try { ... } catch (Throwable) { lifecycleState.setScheduled(false); ...; throw; }. If there is a problem with the loop after, we would leave it as scheduled, no?
| writeLock.lock(); | ||
| try { | ||
| setMaxThreadCount(maxThreadCount, "Timer Driven", this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads); | ||
| virtualThreadSchedulingAgent.setMaxThreadCount(maxThreadCount); |
There was a problem hiding this comment.
IIUC we now use the maxThreadCount value in two different places: the FlowEngine thread pool and the semaphore for the VT agent. Should we have separate properties?
There was a problem hiding this comment.
Yeah, I had this fixed locally but forgot to push the commit. Now, the timer-driven engine is used only for framework tasks, so I renamed it and just hardcoded it to 8 threads.
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class FlowController implements ReportingTaskProvider, FlowAnalysisRuleProvider, Authorizable, NodeTypeProvider { |
There was a problem hiding this comment.
IIUC you didn't make any change on the shutdown path so we do not attempt to interrupt in-flight component virtual threads and the kill of the JVM will just kill threads. That seems wrong, no?
There was a problem hiding this comment.
Good catch, will ensure that they are interrupted.
…e bounding how many tasks can be run at once
c0cf8b2 to
64459cf
Compare
64459cf to
8f34b35
Compare
…e bounding how many tasks can be run at once
Summary
NIFI-00000
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation