Skip to content

Commit

Permalink
vault: enable availability modes to disable and enable background Art…
Browse files Browse the repository at this point in the history
…ifactSync thread
  • Loading branch information
pdowler committed Mar 19, 2024
1 parent 5276e40 commit e284c6e
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,11 @@ public void run() {
DateFormat df = DateUtil.getDateFormat(DateUtil.IVOA_DATE_FORMAT, DateUtil.UTC);
if (harvestState.curLastModified != null) {
log.info("ArtifactSyncWorker.artifactQuery source=" + harvestState.getResourceID()
+ " instance=" + harvestState.instanceID + " start=" + df.format(harvestState.curLastModified));
+ " instance=" + harvestState.instanceID
+ " start=" + df.format(harvestState.curLastModified));
} else {
log.info("ArtifactSyncWorker.artifactQuery source=" + harvestState.getResourceID() + " instance=" + harvestState.instanceID);
log.info("ArtifactSyncWorker.artifactQuery source=" + harvestState.getResourceID()
+ " instance=" + harvestState.instanceID);
}

String uriBucket = null; // process all artifacts in a single thread
Expand Down Expand Up @@ -163,10 +165,12 @@ public void run() {
}
if (harvestState.curLastModified != null) {
log.info("ArtifactSyncWorker.artifactQuery source=" + harvestState.getResourceID()
+ " instance=" + harvestState.instanceID + " end=" + df.format(harvestState.curLastModified));
+ " instance=" + harvestState.instanceID
+ " end=" + df.format(harvestState.curLastModified));
} else {
log.info("ArtifactSyncWorker.artifactQuery source=" + harvestState.getResourceID()
+ " instance=" + harvestState.instanceID + " end=true");
+ " instance=" + harvestState.instanceID
+ " end=true");
}
}
}
10 changes: 7 additions & 3 deletions vault/src/main/java/org/opencadc/vault/NodePersistenceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,22 @@ public class NodePersistenceImpl implements NodePersistence {
private final Namespace storageNamespace;

private final boolean localGroupsOnly;
private URI resourceID;
private final URI resourceID;
private final boolean preventNotFound;

final String appName; // access by VaultTransferGenerator

// possibly temporary hack so migration tool can set this to false and
// preserve lastModified timestamps on nodes
public boolean nodeOrigin = true;

public NodePersistenceImpl(URI resourceID) {
public NodePersistenceImpl(URI resourceID, String appName) {
if (resourceID == null) {
throw new IllegalArgumentException("resource ID required");
}
this.resourceID = resourceID;
this.appName = appName;

MultiValuedProperties config = VaultInitAction.getConfig();
this.nodeDaoConfig = VaultInitAction.getDaoConfig(config);
this.invDaoConfig = VaultInitAction.getInvConfig(config);
Expand Down Expand Up @@ -259,7 +263,7 @@ public TransferGenerator getTransferGenerator() {
keyDAO.setConfig(kpDaoConfig);
PreauthKeyPair kp = keyDAO.get(VaultInitAction.KEY_PAIR_NAME);
TokenTool tt = new TokenTool(kp.getPublicKey(), kp.getPrivateKey());
return new VaultTransferGenerator(this, getArtifactDAO(), tt, preventNotFound);
return new VaultTransferGenerator(this, appName, getArtifactDAO(), tt, preventNotFound);
}

private NodeDAO getDAO() {
Expand Down
45 changes: 44 additions & 1 deletion vault/src/main/java/org/opencadc/vault/ServiceAvailability.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@

package org.opencadc.vault;

import ca.nrc.cadc.db.DBUtil;
import ca.nrc.cadc.rest.RestAction;
import ca.nrc.cadc.vosi.Availability;
import ca.nrc.cadc.vosi.AvailabilityPlugin;

import ca.nrc.cadc.vosi.avail.CheckDataSource;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import org.apache.log4j.Logger;
import org.opencadc.vault.metadata.ArtifactSync;

/**
* This class performs the work of determining if the executing artifact
Expand Down Expand Up @@ -130,7 +136,31 @@ public Availability getStatus() {
return new Availability(false, RestAction.STATE_READ_ONLY_MSG);
}

//TODO add availability checks for dependent services
// check database pools
DataSource ds;
String testSQL;
CheckDataSource cds;

ds = DBUtil.findJNDIDataSource("jdbc/nodes");
testSQL = "select * from vospace.ModelVersion";
cds = new CheckDataSource(ds, testSQL);
cds.check();

ds = DBUtil.findJNDIDataSource("jdbc/inventory");
testSQL = "select * from inventory.Artifact limit 1";
cds = new CheckDataSource(ds, testSQL);
cds.check();

ds = DBUtil.findJNDIDataSource("jdbc/inventory-iterator");
testSQL = "select * from inventory.Artifact limit 1";
cds = new CheckDataSource(ds, testSQL);
cds.check();

ds = DBUtil.findJNDIDataSource("jdbc/uws");
testSQL = "select * from uws.Job limit 1";
cds = new CheckDataSource(ds, testSQL);
cds.check();

} catch (Throwable t) {
// the test itself failed
log.debug("failure", t);
Expand All @@ -149,10 +179,13 @@ public void setState(String state) {
String key = appName + RestAction.STATE_MODE_KEY;
if (RestAction.STATE_OFFLINE.equalsIgnoreCase(state)) {
System.setProperty(key, RestAction.STATE_OFFLINE);
setOffline(true);
} else if (RestAction.STATE_READ_ONLY.equalsIgnoreCase(state)) {
System.setProperty(key, RestAction.STATE_READ_ONLY);
setOffline(true);
} else if (RestAction.STATE_READ_WRITE.equalsIgnoreCase(state)) {
System.setProperty(key, RestAction.STATE_READ_WRITE);
setOffline(false);
} else {
throw new IllegalArgumentException("invalid state: " + state
+ " expected: " + RestAction.STATE_READ_WRITE + "|"
Expand All @@ -170,4 +203,14 @@ private String getState() {
return ret;
}

private void setOffline(boolean offline) {
String jndiArtifactSync = appName + "-" + ArtifactSync.class.getName();
try {
InitialContext initialContext = new InitialContext();
ArtifactSync async = (ArtifactSync) initialContext.lookup(jndiArtifactSync);
async.setOffline(offline);
} catch (NamingException e) {
log.debug(String.format("unable to unbind %s - %s", jndiArtifactSync, e.getMessage()));
}
}
}
64 changes: 44 additions & 20 deletions vault/src/main/java/org/opencadc/vault/VaultInitAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
******************* CANADIAN ASTRONOMY DATA CENTRE *******************
************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES **************
*
* (c) 2023. (c) 2023.
* (c) 2024. (c) 2024.
* Government of Canada Gouvernement du Canada
* National Research Council Conseil national de recherches
* Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6
Expand Down Expand Up @@ -134,10 +134,11 @@ public class VaultInitAction extends InitAction {
private List<String> allocationParents = new ArrayList<>();

private String jndiNodePersistence;
private String jndiPreauthKeys;
private String jndiPreauthKeys; // store pubkey in JNDI for download via GetKeyAction
private String jndiArtifactSync; // store in JNDI to support availability mode change
private String jndiSiteAvailabilities;
private Thread availabilityCheck;
private Thread artifactSync;
private Thread artifactSyncThread;

public VaultInitAction() {
super();
Expand Down Expand Up @@ -369,7 +370,7 @@ private void initNodePersistence() {
} catch (NamingException ignore) {
log.debug("unbind previous JNDI key (" + jndiNodePersistence + ") failed... ignoring");
}
NodePersistence npi = new NodePersistenceImpl(resourceID);
NodePersistence npi = new NodePersistenceImpl(resourceID, appName);
ctx.bind(jndiNodePersistence, npi);

log.info("initNodePersistence: created JNDI key: " + jndiNodePersistence + " impl: " + npi.getClass().getName());
Expand Down Expand Up @@ -455,31 +456,54 @@ private void terminateAvailabilityCheck() {
}

private void initBackgroundWorkers() {
HarvestStateDAO hsDAO = new HarvestStateDAO();
hsDAO.setConfig(vosDaoConfig);

ArtifactDAO artifactDAO = new ArtifactDAO();
Map<String,Object> iterprops = getIteratorConfig(props);
log.warn("iterator pool: " + iterprops.get("jndiDataSourceName"));
artifactDAO.setConfig(iterprops);

terminateBackgroundWorkers();
this.artifactSync = new Thread(new ArtifactSync(hsDAO, artifactDAO, storageNamespace));
artifactSync.setDaemon(true);
artifactSync.start();
try {
HarvestStateDAO hsDAO = new HarvestStateDAO();
hsDAO.setConfig(vosDaoConfig);

ArtifactDAO artifactDAO = new ArtifactDAO();
Map<String,Object> iterprops = getIteratorConfig(props);
log.warn("iterator pool: " + iterprops.get("jndiDataSourceName"));
artifactDAO.setConfig(iterprops);

terminateBackgroundWorkers();
ArtifactSync async = new ArtifactSync(hsDAO, artifactDAO, storageNamespace);
this.artifactSyncThread = new Thread(async);
artifactSyncThread.setDaemon(true);
artifactSyncThread.start();

// store in JNDI so availability can set offline
String jndiArtifactSync = appName + "-" + ArtifactSync.class.getName();
InitialContext ctx = new InitialContext();
try {
ctx.unbind(jndiArtifactSync);
} catch (NamingException ignore) {
log.debug("unbind previous JNDI key (" + jndiPreauthKeys + ") failed... ignoring");
}
ctx.bind(jndiArtifactSync, async);
log.info("initBackgroundWorkers: created JNDI key: " + jndiArtifactSync);
} catch (Exception ex) {
throw new RuntimeException("check/init ArtifactSync failed", ex);
}
}

private void terminateBackgroundWorkers() {
if (this.artifactSync != null) {
if (this.artifactSyncThread != null) {
try {
log.info("terminating ArtifactSync Thread...");
this.artifactSync.interrupt();
this.artifactSync.join();
this.artifactSyncThread.interrupt();
this.artifactSyncThread.join();
log.info("terminating ArtifactSync Thread... [OK]");
} catch (Throwable t) {
log.info("failed to terminate ArtifactSync thread", t);
} finally {
this.artifactSync = null;
this.artifactSyncThread = null;
}

try {
InitialContext initialContext = new InitialContext();
initialContext.unbind(this.jndiArtifactSync);
} catch (NamingException e) {
log.debug(String.format("unable to unbind %s - %s", this.jndiArtifactSync, e.getMessage()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,16 @@ public class VaultTransferGenerator implements TransferGenerator {
private final Map<URI, Availability> siteAvailabilities;

@SuppressWarnings("unchecked")
public VaultTransferGenerator(NodePersistenceImpl nodePersistence, ArtifactDAO artifactDAO, TokenTool tokenTool, boolean preventNotFound) {
public VaultTransferGenerator(NodePersistenceImpl nodePersistence, String appName,
ArtifactDAO artifactDAO, TokenTool tokenTool, boolean preventNotFound) {
this.nodePersistence = nodePersistence;
this.authorizer = new VOSpaceAuthorizer(nodePersistence);
this.artifactDAO = artifactDAO;
this.tokenTool = tokenTool;
this.preventNotFound = preventNotFound;

// TODO: get appname from ???
String siteAvailabilitiesKey = "vault" + "-" + StorageSiteAvailabilityCheck.class.getName();
String siteAvailabilitiesKey = appName + "-" + StorageSiteAvailabilityCheck.class.getName();
log.debug("siteAvailabilitiesKey: " + siteAvailabilitiesKey);
try {
Context initContext = new InitialContext();
Expand Down
2 changes: 0 additions & 2 deletions vault/src/main/java/org/opencadc/vault/files/GetAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@
import ca.nrc.cadc.auth.AuthenticationUtil;
import ca.nrc.cadc.net.TransientException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.List;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
import org.opencadc.vospace.DataNode;
import org.opencadc.vospace.NodeProperty;
import org.opencadc.vospace.VOS;
import org.opencadc.vospace.VOSURI;
import org.opencadc.vospace.server.NodeFault;
Expand Down
20 changes: 16 additions & 4 deletions vault/src/main/java/org/opencadc/vault/metadata/ArtifactSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public class ArtifactSync implements Runnable {
private static final Logger log = Logger.getLogger(ArtifactSync.class);

private static final long ROUNDS = 6000L; // 6 sec
private static final long SHORT_SLEEP = 2 * ROUNDS;
private static final long LONG_SLEEP = 5 * ROUNDS;
private static final long SHORT_SLEEP = 5 * ROUNDS;
private static final long LONG_SLEEP = 10 * ROUNDS;
private static final long EVICT_AGE = 12 * ROUNDS;

private static final long FAIL_SLEEP = 10 * ROUNDS; // slightly smaller than evict
Expand All @@ -99,6 +99,8 @@ public class ArtifactSync implements Runnable {
private final ArtifactDAO artifactDAO;
private final Namespace artifactNamespace;

private boolean offline = false;

public ArtifactSync(HarvestStateDAO stateDAO, ArtifactDAO artifactDAO, Namespace artifactNamespace) {
this.stateDAO = stateDAO;
this.artifactDAO = artifactDAO;
Expand All @@ -113,14 +115,24 @@ public ArtifactSync(HarvestStateDAO stateDAO, ArtifactDAO artifactDAO, Namespace
stateDAO.setMaintCount(9999); // every 1e4
}

public void setOffline(boolean offline) {
this.offline = offline;

}

@Override
public void run() {
String name = Artifact.class.getSimpleName();
URI resourceID = URI.create("jdbc:inventory");
try {
Thread.sleep(SHORT_SLEEP);
Thread.sleep(1 * ROUNDS); // delay startup a bit

while (true) {
while (offline) {
log.warn("disabled: sleep=" + LONG_SLEEP);
Thread.sleep(LONG_SLEEP);
}

log.debug("check leader " + instanceID);
HarvestState state = stateDAO.get(name, resourceID);
log.debug("check leader " + instanceID + " found: " + state);
Expand All @@ -135,7 +147,7 @@ public void run() {
boolean leader = checkLeaderStatus(state);

if (leader) {
log.info("LEADER " + state.instanceID);
log.debug("leader: " + state);
boolean fail = false;
try {
ArtifactSyncWorker worker = new ArtifactSyncWorker(stateDAO, state, artifactDAO, artifactNamespace);
Expand Down
5 changes: 3 additions & 2 deletions vault/src/main/webapp/META-INF/context.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
removeAbandoned="false"
testOnBorrow="true" validationQuery="select 123" />

<!-- NOTE single connection is hard coded here!! -->
<!-- pool size is hard coded here: 2 so that the availability can check the pool
without blocking while the iterator is running -->
<Resource name="jdbc/inventory-iterator"
auth="Container"
type="javax.sql.DataSource"
factory="org.apache.tomcat.jdbc.pool.DataSourceFactory" closeMethod="close"
minEvictableIdleTimeMillis="60000" timeBetweenEvictionRunsMillis="30000"
maxWait="20000"
initialSize="0" minIdle="0" maxIdle="1" maxActive="1"
initialSize="0" minIdle="0" maxIdle="2" maxActive="2"
username="${org.opencadc.vault.inventory.username}" password="${org.opencadc.vault.inventory.password}"
driverClassName="org.postgresql.Driver" url="${org.opencadc.vault.inventory.url}"
removeAbandoned="false"
Expand Down
4 changes: 0 additions & 4 deletions vault/src/main/webapp/WEB-INF/web.xml
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,6 @@
<param-name>ca.nrc.cadc.vosi.AvailabilityPlugin</param-name>
<param-value>org.opencadc.vault.ServiceAvailability</param-value>
</init-param>
<init-param>
<param-name>availabilityProperties</param-name>
<param-value>vault-availability.properties</param-value>
</init-param>
<load-on-startup>3</load-on-startup>
</servlet>

Expand Down

0 comments on commit e284c6e

Please sign in to comment.