Skip to content

NIFI-15862: Moved Processors to using Virtual Threads with a Semaphor…#11164

Open
markap14 wants to merge 1 commit intoapache:mainfrom
markap14:virtual-threads
Open

NIFI-15862: Moved Processors to using Virtual Threads with a Semaphor…#11164
markap14 wants to merge 1 commit intoapache:mainfrom
markap14:virtual-threads

Conversation

@markap14
Copy link
Copy Markdown
Contributor

…e bounding how many tasks can be run at once

Summary

NIFI-00000

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

…e bounding how many tasks can be run at once
private void runOnce(final Connectable connectable, final ConnectableTask connectableTask, final Callable<Future<Void>> stopCallback) {
try {
try {
globalSemaphore.acquire();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use acquirePermitWithPolling?

Comment on lines +190 to +193
public void setMaxThreadCount(final int maxThreads) {
globalSemaphore.setMaxPermits(maxThreads);
logger.info("Global semaphore permits updated to {}", maxThreads);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incrementMaxThreadCount is synchronized but setMaxThreadCount is not. Feels like we could have an update race condition, no?

cronExpression = null;
}

lifecycleState.setScheduled(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a try/catch here? Something like try { ... } catch (Throwable) { lifecycleState.setScheduled(false); ...; throw; }. If there is a problem with the loop after, we would leave it as scheduled, no?

writeLock.lock();
try {
setMaxThreadCount(maxThreadCount, "Timer Driven", this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
virtualThreadSchedulingAgent.setMaxThreadCount(maxThreadCount);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we now use the maxThreadCount value in two different places: the FlowEngine thread pool and the semaphore for the VT agent. Should we have separate properties?


import static java.util.Objects.requireNonNull;

public class FlowController implements ReportingTaskProvider, FlowAnalysisRuleProvider, Authorizable, NodeTypeProvider {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC you didn't make any change on the shutdown path so we do not attempt to interrupt in-flight component virtual threads and the kill of the JVM will just kill threads. That seems wrong, no?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants