Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing synchronization in SemaphoreStep #235

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package org.jenkinsci.plugins.workflow.test.steps;

import com.google.inject.Inject;
import hudson.Extension;
import hudson.model.Run;
import java.io.File;
Expand All @@ -36,34 +35,40 @@
import java.util.Set;
import edu.umd.cs.findbugs.annotations.CheckForNull;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.logging.Level;
import java.util.logging.Logger;
import jenkins.model.Jenkins;
import org.jenkinsci.plugins.workflow.steps.AbstractStepDescriptorImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl;
import org.jenkinsci.plugins.workflow.steps.AbstractStepImpl;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jvnet.hudson.test.JenkinsRule;
import org.kohsuke.stapler.DataBoundConstructor;

/**
* Step that blocks until signaled.
* Starts running and waits for {@link #success(Object)} or {@link #failure(Throwable)} to be called, if they have not been already.
* Starts running and waits for {@link #success(String, Object)} or {@link #failure(String, Throwable)} to be called, if they have not been already.
*/
public final class SemaphoreStep extends AbstractStepImpl implements Serializable {
public final class SemaphoreStep extends Step implements Serializable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaning up deprecations.


private static final Logger LOGGER = Logger.getLogger(SemaphoreStep.class.getName());

/** State of semaphore steps within one Jenkins home and thus (possibly restarting) test. */
private static final class State {
private static final Map<File,State> states = new HashMap<File,State>();
private static final Map<File,State> states = new HashMap<>();
static synchronized State get() {
File home = Jenkins.get().getRootDir();
State state = states.get(home);
if (state == null) {
LOGGER.info(() -> "Initializing state in " + home);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some extra logging here and in stop in case the fault lied there, but it does not seem to have.

state = new State();
states.put(home, state);
}
return state;
}
private State() {}
private final Map<String,Integer> iota = new HashMap<String,Integer>();
private final Map<String,Integer> iota = new HashMap<>();
synchronized int allocateNumber(String id) {
Integer old = iota.get(id);
if (old == null) {
Expand All @@ -74,10 +79,10 @@ synchronized int allocateNumber(String id) {
return number;
}
/** map from {@link #k} to serial form of {@link StepContext} */
final Map<String,String> contexts = new HashMap<String,String>();
final Map<String,Object> returnValues = new HashMap<String,Object>();
final Map<String,Throwable> errors = new HashMap<String,Throwable>();
final Set<String> started = new HashSet<String>();
final Map<String,String> contexts = new HashMap<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just code cleanup

final Map<String,Object> returnValues = new HashMap<>();
final Map<String,Throwable> errors = new HashMap<>();
final Set<String> started = new HashSet<>();
}

private final String id;
Expand All @@ -96,47 +101,61 @@ private String k() {
return id + "/" + number;
}

/** Marks the step as having successfully completed; or, if not yet started, makes it do so synchronously when started. */
/**@deprecated use {@link #success(String, Object)} */
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there are any tests actually obtaining this object directly. The normal usage involves solely static calls.

@Deprecated
public void success(Object returnValue) {
success(k(), returnValue);
}

/** Marks the step as having successfully completed; or, if not yet started, makes it do so synchronously when started. */
public static void success(String k, Object returnValue) {
State s = State.get();
StepContext c;
synchronized (s) {
if (!s.contexts.containsKey(k)) {
System.err.println("Planning to unblock " + k + " as success");
LOGGER.info(() -> "Planning to unblock " + k + " as success");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Logger is better style and gives us timestamps too.

s.returnValues.put(k, returnValue);
return;
}
c = getContext(s, k);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible point where synchronization was lacking.

}
System.err.println("Unblocking " + k + " as success");
getContext(s, k).onSuccess(returnValue);
LOGGER.info(() -> "Unblocking " + k + " as success");
c.onSuccess(returnValue);
}

/** Marks the step as having failed; or, if not yet started, makes it do so synchronously when started. */
/** @deprecated use {@link #failure(String, Throwable)} */
@Deprecated
public void failure(Throwable error) {
failure(k(), error);
}

/** Marks the step as having failed; or, if not yet started, makes it do so synchronously when started. */
public static void failure(String k, Throwable error) {
State s = State.get();
StepContext c;
synchronized (s) {
if (!s.contexts.containsKey(k)) {
System.err.println("Planning to unblock " + k + " as failure");
LOGGER.info(() -> "Planning to unblock " + k + " as failure");
s.errors.put(k, error);
return;
}
c = getContext(s, k);
}
System.err.println("Unblocking " + k + " as failure");
getContext(s, k).onFailure(error);
LOGGER.info(() -> "Unblocking " + k + " as failure");
c.onFailure(error);
}


/** @deprecated should not be needed */
@Deprecated
public StepContext getContext() {
return getContext(State.get(), k());
State s = State.get();
synchronized (s) {
return getContext(s, k());
}
}

private static StepContext getContext(State s, String k) {
assert Thread.holdsLock(s);
return (StepContext) Jenkins.XSTREAM.fromXML(s.contexts.get(k));
}

Expand All @@ -147,29 +166,50 @@ public static void waitForStart(@NonNull String k, @CheckForNull Run<?,?> b) thr
if (b != null && !b.isBuilding()) {
throw new AssertionError(JenkinsRule.getLog(b));
}
s.wait(1000);
s.wait(100);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might shave off a fraction of a second from tests to check this more promptly.

}
}
}

@Override public StepExecution start(StepContext context) throws Exception {
return new Execution(context, k());
}

public static class Execution extends AbstractStepExecutionImpl {

@Inject(optional=true) private SemaphoreStep step;
private String k;
private final String k;

Execution(StepContext context, String k) {
super(context);
this.k = k;
}

@Override public boolean start() throws Exception {
State s = State.get();
k = step.k();
boolean sync = true;
if (s.returnValues.containsKey(k)) {
System.err.println("Immediately running " + k);
getContext().onSuccess(s.returnValues.get(k));
} else if (s.errors.containsKey(k)) {
System.err.println("Immediately failing " + k);
getContext().onFailure(s.errors.get(k));
Object returnValue = null;
Throwable error = null;
boolean success = false, failure = false, sync = true;
synchronized (s) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point where synchronization was inadequate. Rather cumbersome idiom to avoid holding the lock while making Pipeline API calls, to avoid possible deadlocks.

if (s.returnValues.containsKey(k)) {
success = true;
returnValue = s.returnValues.get(k);
} else if (s.errors.containsKey(k)) {
failure = true;
error = s.errors.get(k);
}
}
if (success) {
LOGGER.info(() -> "Immediately running " + k);
getContext().onSuccess(returnValue);
} else if (failure) {
LOGGER.info(() -> "Immediately failing " + k);
getContext().onFailure(error);
} else {
System.err.println("Blocking " + k);
s.contexts.put(k, Jenkins.XSTREAM.toXML(getContext()));
LOGGER.info(() -> "Blocking " + k);
String c = Jenkins.XSTREAM.toXML(getContext());
synchronized (s) {
s.contexts.put(k, c);
}
sync = false;
}
synchronized (s) {
Expand All @@ -181,23 +221,25 @@ public static class Execution extends AbstractStepExecutionImpl {

@Override public void stop(Throwable cause) throws Exception {
State s = State.get();
s.contexts.remove(k);
synchronized (s) {
s.contexts.remove(k);
}
LOGGER.log(Level.INFO, cause, () -> "Stopping " + k);
super.stop(cause);
}

@Override public String getStatus() {
return State.get().contexts.containsKey(k) ? "waiting on " + k : "finished " + k;
State s = State.get();
synchronized (s) {
return s.contexts.containsKey(k) ? "waiting on " + k : "finished " + k;
}
}

private static final long serialVersionUID = 1L;

}

@Extension public static final class DescriptorImpl extends AbstractStepDescriptorImpl {

public DescriptorImpl() {
super(Execution.class);
}
@Extension public static final class DescriptorImpl extends StepDescriptor {

@Override public String getFunctionName() {
return "semaphore";
Expand All @@ -207,6 +249,10 @@ public DescriptorImpl() {
return "Test step";
}

@Override public Set<? extends Class<?>> getRequiredContext() {
return Set.of();
}

}

private static final long serialVersionUID = 1L;
Expand Down