NIFI-15862: Moved Processors to using Virtual Threads with a Semaphor…#11164
NIFI-15862: Moved Processors to using Virtual Threads with a Semaphor…#11164markap14 wants to merge 1 commit intoapache:mainfrom
Conversation
…e bounding how many tasks can be run at once
00c6167 to
c0cf8b2
Compare
| private void runOnce(final Connectable connectable, final ConnectableTask connectableTask, final Callable<Future<Void>> stopCallback) { | ||
| try { | ||
| try { | ||
| globalSemaphore.acquire(); |
There was a problem hiding this comment.
Should we use acquirePermitWithPolling?
| public void setMaxThreadCount(final int maxThreads) { | ||
| globalSemaphore.setMaxPermits(maxThreads); | ||
| logger.info("Global semaphore permits updated to {}", maxThreads); | ||
| } |
There was a problem hiding this comment.
incrementMaxThreadCount is synchronized but setMaxThreadCount is not. Feels like we could have an update race condition, no?
| cronExpression = null; | ||
| } | ||
|
|
||
| lifecycleState.setScheduled(true); |
There was a problem hiding this comment.
Should we have a try/catch here? Something like try { ... } catch (Throwable) { lifecycleState.setScheduled(false); ...; throw; }. If there is a problem with the loop after, we would leave it as scheduled, no?
| writeLock.lock(); | ||
| try { | ||
| setMaxThreadCount(maxThreadCount, "Timer Driven", this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads); | ||
| virtualThreadSchedulingAgent.setMaxThreadCount(maxThreadCount); |
There was a problem hiding this comment.
IIUC we now use the maxThreadCount value in two different places: the FlowEngine thread pool and the semaphore for the VT agent. Should we have separate properties?
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class FlowController implements ReportingTaskProvider, FlowAnalysisRuleProvider, Authorizable, NodeTypeProvider { |
There was a problem hiding this comment.
IIUC you didn't make any change on the shutdown path so we do not attempt to interrupt in-flight component virtual threads and the kill of the JVM will just kill threads. That seems wrong, no?
…e bounding how many tasks can be run at once
Summary
NIFI-00000
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation