Skip to content

Commit

Permalink
Merge pull request #521 from m-click/bugfix/sqlfeaturestore/lost-updates
Browse files Browse the repository at this point in the history
SQLFeatureStore: Fix lost updates in multi-part transactions
  • Loading branch information
MrSnyder committed Apr 30, 2015
2 parents 62463a9 + e327d57 commit a9af1fc
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
----------------------------------------------------------------------------*/
package org.deegree.feature.persistence.sql;

import static org.deegree.commons.utils.JDBCUtils.close;
import static org.deegree.commons.xml.CommonNamespaces.OGCNS;
import static org.deegree.commons.xml.CommonNamespaces.XLNNS;
import static org.deegree.commons.xml.CommonNamespaces.XSINS;
Expand Down Expand Up @@ -143,13 +142,12 @@

/**
* {@link FeatureStore} that is backed by a spatial SQL database.
*
*
* @see SQLDialect
*
* @author <a href="mailto:schneider@lat-lon.de">Markus Schneider</a>
* @author last edited by: $Author$
*
* @version $Revision$, $Date$
*
* @author <a href="mailto:schneider@occamlabs.de">Markus Schneider</a>
*
* @since 3.2
*/
@LoggingNotes(info = "logs particle converter initialization", debug = "logs the SQL statements sent to the SQL server and startup/shutdown information")
public class SQLFeatureStore implements FeatureStore {
Expand Down Expand Up @@ -200,9 +198,11 @@ public class SQLFeatureStore implements FeatureStore {

private ConnectionProvider connProvider;

private final ThreadLocal<SQLFeatureStoreTransaction> transaction = new ThreadLocal<SQLFeatureStoreTransaction>();

/**
* Creates a new {@link SQLFeatureStore} for the given configuration.
*
*
* @param config
* jaxb configuration object
* @param configURL
Expand Down Expand Up @@ -328,7 +328,7 @@ public String getConnId() {

/**
* Returns the relational mapping for the given feature type name.
*
*
* @param ftName
* name of the feature type, must not be <code>null</code>
* @return relational mapping for the feature type, may be <code>null</code> (no relational mapping)
Expand All @@ -340,7 +340,7 @@ public FeatureTypeMapping getMapping( QName ftName ) {
/**
* Returns a {@link ParticleConverter} for the given {@link Mapping} instance from the served
* {@link MappedAppSchema}.
*
*
* @param mapping
* particle mapping, must not be <code>null</code>
* @return particle converter, never <code>null</code>
Expand All @@ -365,10 +365,12 @@ public Envelope calcEnvelope( QName ftName )
Envelope env = null;
Connection conn = null;
try {
conn = connProvider.getConnection();
conn = getConnection();
env = calcEnvelope( ftName, conn );
} catch ( SQLException e ) {
throw new FeatureStoreException( e.getMessage() );
} finally {
JDBCUtils.close( conn );
release( null, null, conn );
}
return env;
}
Expand Down Expand Up @@ -428,7 +430,7 @@ private Envelope calcEnvelope( FeatureTypeMapping ftMapping, Connection conn )
LOG.debug( e.getMessage(), e );
throw new FeatureStoreException( e.getMessage(), e );
} finally {
close( rs, stmt, null, LOG );
release( rs, stmt, null );
}
return env;
}
Expand Down Expand Up @@ -463,7 +465,7 @@ private Envelope calcEnvelope( QName ftName, BlobMapping blobMapping, Connection
LOG.debug( e.getMessage(), e );
throw new FeatureStoreException( e.getMessage(), e );
} finally {
close( rs, stmt, null, LOG );
release( rs, stmt, null );
}
return env;
}
Expand Down Expand Up @@ -505,7 +507,7 @@ private GMLObject getObjectByIdBlob( String id, BlobMapping blobMapping )
sql.append( blobMapping.getGMLIdColumn() );
sql.append( "=?" );

conn = connProvider.getConnection();
conn = getConnection();
stmt = conn.prepareStatement( sql.toString() );
stmt.setFetchSize( fetchSize );
stmt.setString( 1, id );
Expand All @@ -524,7 +526,7 @@ private GMLObject getObjectByIdBlob( String id, BlobMapping blobMapping )
LOG.debug( msg, e );
throw new FeatureStoreException( msg, e );
} finally {
close( rs, stmt, conn, LOG );
release( rs, stmt, conn );
}
return geomOrFeature;
}
Expand Down Expand Up @@ -561,20 +563,31 @@ public LockManager getLockManager()
@Override
public FeatureStoreTransaction acquireTransaction()
throws FeatureStoreException {
FeatureStoreTransaction ta = null;
SQLFeatureStoreTransaction ta = null;
try {
Connection conn = getConnection();
final Connection conn = getConnection();
conn.setAutoCommit( false );
ta = new SQLFeatureStoreTransaction( this, conn, getSchema(), inspectors );
transaction.set( ta );
} catch ( SQLException e ) {
throw new FeatureStoreException( "Unable to acquire JDBC connection for transaction: " + e.getMessage(), e );
}
return ta;
}

void closeAndDetachTransactionConnection() throws FeatureStoreException {
try {
transaction.get().getConnection().close();
} catch ( final SQLException e ) {
LOG.error( "Error closing connection/removing it from the pool: " + e.getMessage() );
} finally {
transaction.remove();
}
}

/**
* Returns the {@link FeatureStoreCache}.
*
*
* @return feature store cache, can be <code>null</code> (no cache configured)
*/
public FeatureStoreCache getCache() {
Expand All @@ -583,7 +596,7 @@ public FeatureStoreCache getCache() {

/**
* Returns a resolver instance for resolving references to objects that are stored in this feature store.
*
*
* @return resolver, never <code>null</code>
*/
public GMLReferenceResolver getResolver() {
Expand Down Expand Up @@ -659,7 +672,6 @@ private int queryHitsByOperatorFilter( Query query, QName ftName, OperatorFilter
AbstractWhereBuilder wb = getWhereBuilder( ft, filter, query.getSortProperties(), conn );

if ( wb.getPostFilter() != null ) {
conn.close();
LOG.debug( "Filter not fully mappable to WHERE clause. Need to iterate over all features to determine count." );
hits = queryByOperatorFilter( query, ftName, filter ).count();
} else {
Expand Down Expand Up @@ -732,7 +744,7 @@ private int queryHitsByOperatorFilter( Query query, QName ftName, OperatorFilter
LOG.error( msg, e );
throw new FeatureStoreException( msg, e );
} finally {
JDBCUtils.close( rs, stmt, conn, LOG );
release( rs, stmt, conn );
}

return hits;
Expand Down Expand Up @@ -798,7 +810,7 @@ private int queryHitsByOperatorFilterBlob( Query query, QName ftName, OperatorFi
LOG.error( msg, e );
throw new FeatureStoreException( msg, e );
} finally {
close( rs, stmt, conn, LOG );
release( rs, stmt, conn );
}

return hits;
Expand Down Expand Up @@ -826,7 +838,7 @@ public Map<String, String> getNamespaceContext() {

/**
* Returns a transformed version of the given {@link Geometry} in the specified CRS.
*
*
* @param literal
* @param crs
* @return transformed version of the geometry, never <code>null</code>
Expand Down Expand Up @@ -970,19 +982,19 @@ private FeatureInputStream queryByIdFilterBlob( IdFilter filter, SortProperty[]
long begin = System.currentTimeMillis();
stmt = conn.prepareStatement( "SELECT gml_id,binary_object FROM " + blobMapping.getTable()
+ " A WHERE A.gml_id in (" + sb + ")" );
LOG.debug( "Preparing SELECT took {} [ms] ", System.currentTimeMillis() - begin );
LOG.debug( "Preparing SELECT took {} [ms] ", System.currentTimeMillis() - begin );
stmt.setFetchSize( fetchSize );
int idx = 0;
for ( String id : filter.getMatchingIds() ) {
stmt.setString( ++idx, id );
}
begin = System.currentTimeMillis();
rs = stmt.executeQuery();
LOG.debug( "Executing SELECT took {} [ms] ", System.currentTimeMillis() - begin );
LOG.debug( "Executing SELECT took {} [ms] ", System.currentTimeMillis() - begin );
FeatureBuilder builder = new FeatureBuilderBlob( this, blobMapping );
result = new IteratorFeatureInputStream( new FeatureResultSetIterator( builder, rs, conn, stmt ) );
} catch ( Exception e ) {
close( rs, stmt, conn, LOG );
release( rs, stmt, conn );
String msg = "Error performing id query: " + e.getMessage();
LOG.debug( msg, e );
throw new FeatureStoreException( msg, e );
Expand Down Expand Up @@ -1088,7 +1100,7 @@ private FeatureInputStream queryByIdFilterRelational( IdFilter filter, SortPrope
LOG.debug( "Executing SELECT took {} [ms] ", System.currentTimeMillis() - begin );
result = new IteratorFeatureInputStream( new FeatureResultSetIterator( builder, rs, conn, stmt ) );
} catch ( Exception e ) {
close( rs, stmt, conn, LOG );
release( rs, stmt, conn );
String msg = "Error performing query by id filter (relational mode): " + e.getMessage();
LOG.error( msg, e );
throw new FeatureStoreException( msg, e );
Expand All @@ -1098,11 +1110,26 @@ private FeatureInputStream queryByIdFilterRelational( IdFilter filter, SortPrope

protected Connection getConnection()
throws SQLException {
Connection conn = connProvider.getConnection();
if ( isTransactionActive() ) {
return transaction.get().getConnection();
}
final Connection conn = connProvider.getConnection();
conn.setAutoCommit( readAutoCommit );
return conn;
}

private void release( final ResultSet rs, final Statement stmt, final Connection conn ) {
if ( isTransactionActive() ) {
JDBCUtils.close( rs, stmt, null, LOG );
} else {
JDBCUtils.close( rs, stmt, conn, LOG );
}
}

private boolean isTransactionActive () {
return transaction.get() != null;
}

private FeatureInputStream queryByOperatorFilterBlob( Query query, QName ftName, OperatorFilter filter )
throws FeatureStoreException {

Expand Down Expand Up @@ -1229,7 +1256,7 @@ private FeatureInputStream queryByOperatorFilterBlob( Query query, QName ftName,

result = new IteratorFeatureInputStream( new FeatureResultSetIterator( builder, rs, conn, stmt ) );
} catch ( Exception e ) {
close( rs, stmt, conn, LOG );
release( rs, stmt, conn );
String msg = "Error performing query by operator filter: " + e.getMessage();
LOG.error( msg, e );
throw new FeatureStoreException( msg, e );
Expand Down Expand Up @@ -1344,7 +1371,7 @@ private FeatureInputStream queryByOperatorFilter( Query query, QName ftName, Ope

result = new IteratorFeatureInputStream( new FeatureResultSetIterator( builder, rs, conn, stmt ) );
} catch ( Exception e ) {
close( rs, stmt, conn, LOG );
release( rs, stmt, conn );
String msg = "Error performing query by operator filter: " + e.getMessage();
LOG.error( msg, e );
throw new FeatureStoreException( msg, e );
Expand Down Expand Up @@ -1413,7 +1440,7 @@ private FeatureInputStream queryMultipleFts( Query[] queries, Envelope looseBBox
final FeatureBuilder builder = new FeatureBuilderBlob( this, blobMapping );
result = new IteratorFeatureInputStream( new FeatureResultSetIterator( builder, rs, conn, stmt ) );
} catch ( Exception e ) {
close( rs, stmt, conn, LOG );
release( rs, stmt, conn );
String msg = "Error performing query: " + e.getMessage();
LOG.debug( msg );
LOG.trace( "Stack trace:", e );
Expand Down Expand Up @@ -1475,9 +1502,23 @@ private class FeatureResultSetIterator extends ResultSetIterator<Feature> {

private final FeatureBuilder builder;

private final ResultSet rs;

private final Connection conn;

private final Statement stmt;

public FeatureResultSetIterator( FeatureBuilder builder, ResultSet rs, Connection conn, Statement stmt ) {
super( rs, conn, stmt );
this.builder = builder;
this.rs = rs;
this.conn = conn;
this.stmt = stmt;
}

@Override
public void close() {
release( rs, stmt, conn );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@

/**
* {@link FeatureStoreTransaction} implementation for {@link SQLFeatureStore}.
*
*
* @author <a href="mailto:schneider@lat-lon.de">Markus Schneider</a>
* @author <a href="mailto:schmitz@lat-lon.de">Andreas Schmitz</a>
* @author last edited by: $Author$
*
*
* @version $Revision$, $Date$
*/
public class SQLFeatureStoreTransaction implements FeatureStoreTransaction {
Expand All @@ -141,7 +141,7 @@ public class SQLFeatureStoreTransaction implements FeatureStoreTransaction {

/**
* Creates a new {@link SQLFeatureStoreTransaction} instance.
*
*
* @param store
* corresponding feature store instance, must not be <code>null</code>
* @param conn
Expand Down Expand Up @@ -181,11 +181,7 @@ public void commit()
LOG.debug( t.getMessage(), t );
throw new FeatureStoreException( "Unable to commit SQL transaction: " + t.getMessage() );
} finally {
try {
conn.close();
} catch ( SQLException e ) {
LOG.error( "Error closing connection/removing it from the pool." );
}
fs.closeAndDetachTransactionConnection();
}
}

Expand Down Expand Up @@ -237,11 +233,7 @@ public void rollback()
LOG.debug( e.getMessage(), e );
throw new FeatureStoreException( "Unable to rollback SQL transaction: " + e.getMessage() );
} finally {
try {
conn.close();
} catch ( SQLException e ) {
LOG.error( "Error closing connection/removing it from the pool." );
}
fs.closeAndDetachTransactionConnection();
}
}

Expand All @@ -253,7 +245,7 @@ public FeatureStore getStore() {
/**
* Returns the underlying JDBC connection. Can be used for performing other operations in the same transaction
* context.
*
*
* @return the underlying JDBC connection, never <code>null</code>
*/
public Connection getConnection() {
Expand Down Expand Up @@ -377,7 +369,7 @@ private int deleteFeatureRow( IdAnalysis analysis )
* <p>
* Deletes all joined rows and transitive join rows, but stops at joins to subfeature tables.
* </p>
*
*
* @param fid
* feature id, must not be <code>null</code>
* @throws FeatureStoreException
Expand Down Expand Up @@ -689,7 +681,7 @@ private String generateNewId() {

/**
* Inserts the given feature into BLOB table and returns the generated primary key.
*
*
* @param stmt
* @param feature
* @return primary key of the feature
Expand Down Expand Up @@ -764,7 +756,7 @@ private List<String> performUpdateBlob( final QName ftName, final List<ParsedPro
blobUpdateStmt = conn.prepareStatement( sql.toString() );
features = fs.query( query );
for ( final Feature feature : features ) {
new FeatureUpdater().update( feature, replacementProps );
new FeatureUpdater().update( feature, replacementProps );
updateFeatureBlob( blobUpdateStmt, feature );
updatedFids.add( feature.getId() );
}
Expand Down

0 comments on commit a9af1fc

Please sign in to comment.