-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Job history purging #4575
Job history purging #4575
Changes from 7 commits
a603860
a03953e
e2bbf59
40fc03a
30db1cb
ff60e26
ff758ed
e1d9533
5039d12
b94f218
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,6 +82,11 @@ | |
|
||
public class DefaultJobPersistence implements JobPersistence { | ||
|
||
// not final because job history test case manipulates these. | ||
public static int JOB_HISTORY_MINIMUM_AGE_IN_DAYS = 15; | ||
public static int JOB_HISTORY_MINIMUM_RECENCY = 5; | ||
public static int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = 10; | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class); | ||
private static final Set<String> SYSTEM_SCHEMA = Set | ||
.of("pg_toast", "information_schema", "pg_catalog", "import_backup", "pg_internal", | ||
|
@@ -506,6 +511,48 @@ private List<String> listTables(final String schema) throws IOException { | |
} | ||
} | ||
|
||
@Override | ||
public void purgeJobHistory(LocalDateTime asOfDate) throws IOException { | ||
final String JOB_HISTORY_PURGE_SQL = "delete from jobs where jobs.id in (" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be easier to read as a resource There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
+ "select jobs.id \n" | ||
+ "from jobs \n" | ||
+ "left join (\n" | ||
+ " select scope, count(jobs.id) as jobCount from jobs group by scope\n" | ||
+ ") counts on jobs.scope = counts.scope \n" | ||
+ "where \n" | ||
+ " -- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS \n" | ||
+ " (jobs.created_at < (TO_TIMESTAMP(?, 'YYYY-MM-DD') - interval '" + (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1) | ||
+ "' day) or counts.jobCount > ?) \n" | ||
+ "and jobs.id not in (\n" | ||
+ " -- cannot be the most recent job with saved state \n" | ||
+ " select job_id as latest_job_id_with_state from (\n" | ||
+ " select jobs.scope, \n" | ||
+ " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status, \n" | ||
+ " bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists,\n" | ||
+ " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as stateRecency\n" | ||
+ " from jobs left join attempts on jobs.id = attempts.job_id \n" | ||
+ " group by scope, jobs.id\n" | ||
+ " having bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) = true\n" | ||
+ " order by scope, jobs.created_at desc, jobs.id desc\n" | ||
+ " ) jobs_with_state where stateRecency=1\n " | ||
+ ") and jobs.id not in (\n" | ||
+ " -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope\n" | ||
+ " select id from (\n" | ||
+ " select jobs.scope, jobs.id, jobs.created_at,\n" | ||
+ " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency\n" | ||
+ " from jobs\n" | ||
+ " group by scope, jobs.id\n" | ||
+ " order by scope, jobs.created_at desc, jobs.id desc\n" | ||
+ " ) jobs_by_recency \n" | ||
+ " where recency <= ?\n" | ||
+ "))"; | ||
|
||
final Integer rows = database.query(ctx -> ctx.execute(JOB_HISTORY_PURGE_SQL, | ||
asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")), | ||
JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS, | ||
JOB_HISTORY_MINIMUM_RECENCY)); | ||
} | ||
|
||
private List<String> listAllTables(final String schema) throws IOException { | ||
if (schema != null) { | ||
return database.query(context -> context.meta().getSchemas(schema).stream() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
import io.airbyte.scheduler.models.JobStatus; | ||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.time.LocalDateTime; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
@@ -184,4 +185,11 @@ public interface JobPersistence { | |
*/ | ||
void importDatabase(String airbyteVersion, Map<DatabaseSchema, Stream<JsonNode>> data) throws IOException; | ||
|
||
/** | ||
* Purges job history while ensuring that the latest saved-state information is maintained. | ||
* | ||
* @throws IOException | ||
*/ | ||
void purgeJobHistory(LocalDateTime asOfDate) throws IOException; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feels a bit strange to expose this as part of the interface when in practice Maybe it makes more sense to have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good direction, and especially helps with calling this from the scheduler. Fixed. |
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
import com.google.common.collect.Lists; | ||
import com.google.common.collect.Sets; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.text.Sqls; | ||
import io.airbyte.config.JobConfig; | ||
import io.airbyte.config.JobConfig.ConfigType; | ||
import io.airbyte.config.JobGetSpecConfig; | ||
|
@@ -58,6 +59,8 @@ | |
import java.nio.file.Path; | ||
import java.sql.SQLException; | ||
import java.time.Instant; | ||
import java.time.LocalDateTime; | ||
import java.time.ZoneOffset; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
|
@@ -79,6 +82,8 @@ | |
import org.junit.jupiter.api.DisplayName; | ||
import org.junit.jupiter.api.Nested; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.CsvSource; | ||
import org.testcontainers.containers.PostgreSQLContainer; | ||
import org.testcontainers.utility.MountableFile; | ||
|
||
|
@@ -1002,6 +1007,174 @@ void testResetJobCancelled() throws IOException { | |
assertEquals(JobStatus.CANCELLED, updated.getStatus()); | ||
} | ||
|
||
private Job persistJobForTesting(String scope, JobConfig jobConfig, JobStatus status, LocalDateTime runDate) throws IOException, SQLException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be simplified with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't appear to be convenient in any way, as enqueueJob doesn't allow me to set the created/updated dates myself, which is the primary use case of this test scenario. I've wrapped the persistJobForJobHistoryTesting() into the right section of the test case code, so it's clear what it goes with, but I definitely have to keep a separate method of creating job history in bulk. If there's currently no use for that in the primary code, it may as well remain just in the limited test case scope, but it could get moved over if there's a real use case for it. |
||
String when = runDate.toString(); | ||
Optional<Long> id = database.query( | ||
ctx -> ctx.fetch( | ||
"INSERT INTO jobs(config_type, scope, created_at, updated_at, status, config) " + | ||
"SELECT CAST(? AS JOB_CONFIG_TYPE), ?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB) " + | ||
"RETURNING id ", | ||
Sqls.toSqlName(jobConfig.getConfigType()), | ||
scope, | ||
runDate, | ||
runDate, | ||
Sqls.toSqlName(status), | ||
Jsons.serialize(jobConfig))) | ||
.stream() | ||
.findFirst() | ||
.map(r -> r.getValue("id", Long.class)); | ||
return jobPersistence.getJob(id.get()); | ||
} | ||
|
||
private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDateTime runDate, boolean shouldHaveState) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar to above |
||
throws IOException, SQLException { | ||
String attemptOutputWithState = "{\n" | ||
+ " \"sync\": {\n" | ||
+ " \"state\": {\n" | ||
+ " \"state\": {\n" | ||
+ " \"bookmarks\": {" | ||
+ "}}}}}"; | ||
String attemptOutputWithoutState = "{\n" | ||
+ " \"sync\": {\n" | ||
+ " \"output_catalog\": {" | ||
+ "}}}"; | ||
Integer attemptNumber = database.query(ctx -> ctx.fetch( | ||
"INSERT INTO attempts(job_id, attempt_number, log_path, status, created_at, updated_at, output) " | ||
+ "VALUES(?, ?, ?, CAST(? AS ATTEMPT_STATUS), ?, ?, CAST(? as JSONB)) RETURNING attempt_number", | ||
job.getId(), | ||
job.getAttemptsCount(), | ||
logPath, | ||
Sqls.toSqlName(AttemptStatus.FAILED), | ||
runDate, | ||
runDate, | ||
shouldHaveState ? attemptOutputWithState : attemptOutputWithoutState) | ||
.stream() | ||
.findFirst() | ||
.map(r -> r.get("attempt_number", Integer.class)) | ||
.orElseThrow(() -> new RuntimeException("This should not happen"))); | ||
return attemptNumber; | ||
} | ||
|
||
/** | ||
* Testing job history deletion is sensitive to exactly how the constants are configured for | ||
* controlling deletion logic. Thus, the test case injects overrides for those constants, testing a | ||
* comprehensive set of combinations to make sure that the logic is robust to reasonable | ||
* configurations. Extreme configurations such as zero-day retention period are not covered. | ||
* | ||
* Business rules for deletions. 1. Job must be older than X days or its conn has excessive number | ||
* of jobs 2. Job cannot be one of the last N jobs on that conn (last N jobs are always kept). 3. | ||
* Job cannot be holding the most recent saved state (most recent saved state is always kept). | ||
* | ||
* Testing Goal: Set up jobs according to the parameters passed in. Then delete according to the | ||
* rules, and make sure the right number of jobs are left. Against one connection/scope, | ||
* <ol> | ||
* <li>Setup: create a history of jobs that goes back many days (but produces no more than one job a | ||
* day)</li> | ||
* <li>Setup: the most recent job with state in it should be at least N jobs back</li> | ||
* <li>Assert: ensure that after purging, there are the right number of jobs left (and at least min | ||
* recency), including the one with the most recent state.</li> | ||
* <li>Assert: ensure that after purging, there are the right number of jobs left (and at least min | ||
* recency), including the X most recent</li> | ||
* <li>Assert: ensure that after purging, all other job history has been deleted.</li> | ||
* </ol> | ||
* | ||
* @param numJobs How many test jobs to generate; make this enough that all other parameters are | ||
* fully included, for predictable results. | ||
* @param tooManyJobs Takes the place of DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS | ||
* - how many jobs are needed before it ignores date-based age of job when doing deletions. | ||
* @param ageCutoff Takes the place of DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS - | ||
* retention period in days for the most recent jobs; older than this gets deleted. | ||
* @param recencyCutoff Takes the place of DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY - | ||
* retention period in number of jobs; at least this many jobs will be retained after | ||
* deletion (provided enough existed in the first place). | ||
* @param lastStatePosition How far back in the list is the job with the latest saved state. This | ||
* can be manipulated to have the saved-state job inside or prior to the retention period. | ||
* @param expectedAfterPurge How many matching jobs are expected after deletion, given the input | ||
* parameters. This was calculated by a human based on understanding the requirements. | ||
* @param goalOfTestScenario Description of the purpose of that test scenario, so it's easier to | ||
* maintain and understand failures. | ||
* | ||
*/ | ||
@DisplayName("Should purge older job history but maintain certain more recent ones") | ||
@ParameterizedTest | ||
// Cols: numJobs, tooManyJobsCutoff, ageCutoff, recencyCutoff, lastSavedStatePosition, | ||
// expectedAfterPurge, description | ||
@CsvSource({ | ||
"50,100,10,5,9,10,'Validate age cutoff alone'", | ||
"50,100,10,5,13,11,'Validate saved state after age cutoff'", | ||
"50,100,10,15,9,15,'Validate recency cutoff alone'", | ||
"50,100,10,15,17,16,'Validate saved state after recency cutoff'", | ||
"50,20,30,10,9,10,'Validate excess jobs cutoff alone'", | ||
"50,20,30,10,25,11,'Validate saved state after excess jobs cutoff'", | ||
"50,20,30,20,9,20,'Validate recency cutoff with excess jobs cutoff'", | ||
"50,20,30,20,25,21,'Validate saved state after recency and excess jobs cutoff but before age'", | ||
"50,20,30,20,35,21,'Validate saved state after recency and excess jobs cutoff and after age'" | ||
}) | ||
void testPurgeJobHistory(int numJobs, | ||
int tooManyJobs, | ||
int ageCutoff, | ||
int recencyCutoff, | ||
int lastStatePosition, | ||
int expectedAfterPurge, | ||
String goalOfTestScenario) | ||
throws IOException, SQLException { | ||
final String CURRENT_SCOPE = UUID.randomUUID().toString(); | ||
|
||
// Decoys - these jobs will help mess up bad sql queries, even though they shouldn't be deleted. | ||
final String DECOY_SCOPE = UUID.randomUUID().toString(); | ||
|
||
// Reconfigure constants to test various combinations of tuning knobs and make sure all work. | ||
DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS = ageCutoff; | ||
DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = tooManyJobs; | ||
DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY = recencyCutoff; | ||
|
||
LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); | ||
|
||
// Jobs are created in reverse chronological order; id order is the inverse of old-to-new date | ||
// order. | ||
// The most-recent job is in allJobs[0] which means keeping the 10 most recent is [0-9], simplifying | ||
// testing math as we don't have to care how many jobs total existed and were deleted. | ||
List<Job> allJobs = new ArrayList<>(); | ||
List<Job> decoyJobs = new ArrayList<>(); | ||
for (int i = 0; i < numJobs; i++) { | ||
allJobs.add(persistJobForTesting(CURRENT_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); | ||
decoyJobs.add(persistJobForTesting(DECOY_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); | ||
} | ||
|
||
// At least one job should have state. Find the desired job and add state to it. | ||
Job lastJobWithState = addStateToJob(allJobs.get(lastStatePosition)); | ||
addStateToJob(decoyJobs.get(lastStatePosition - 1)); | ||
addStateToJob(decoyJobs.get(lastStatePosition + 1)); | ||
|
||
// An older job with state should also exist, so we ensure we picked the most-recent with queries. | ||
Job olderJobWithState = addStateToJob(allJobs.get(lastStatePosition + 1)); | ||
|
||
// sanity check that the attempt does have saved state so the purge history sql detects it correctly | ||
assertTrue(lastJobWithState.getAttempts().get(0).getOutput() != null, | ||
goalOfTestScenario + " - missing saved state on job that was supposed to have it."); | ||
|
||
// Execute the job history purge and check what jobs are left. | ||
jobPersistence.purgeJobHistory(fakeNow); | ||
List<Job> afterPurge = jobPersistence.listJobs(ConfigType.SYNC, CURRENT_SCOPE, 9999, 0); | ||
|
||
// Test - contains expected number of jobs and no more than that | ||
assertEquals(expectedAfterPurge, afterPurge.size(), goalOfTestScenario + " - Incorrect number of jobs remain after deletion."); | ||
|
||
// Test - most-recent are actually the most recent by date (see above, reverse order) | ||
for (int i = 0; i < Math.min(ageCutoff, recencyCutoff); i++) { | ||
assertEquals(allJobs.get(i).getId(), afterPurge.get(i).getId(), goalOfTestScenario + " - Incorrect sort order after deletion."); | ||
} | ||
|
||
// Test - job with latest state is always kept despite being older than some cutoffs | ||
assertTrue(afterPurge.contains(lastJobWithState), goalOfTestScenario + " - Missing last job with saved state after deletion."); | ||
} | ||
|
||
private Job addStateToJob(Job job) throws IOException, SQLException { | ||
persistAttemptForJobHistoryTesting(job, LOG_PATH.toString(), | ||
LocalDateTime.ofEpochSecond(job.getCreatedAtInSecond(), 0, ZoneOffset.UTC), true); | ||
return jobPersistence.getJob(job.getId()); // reload job to include its attempts | ||
} | ||
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We often avoid the pattern of dropping the
final
by modifying the testing constructorDefaultJobPersistence(Database database, Supplier<Instant> timeSupplier)
to accept these as parameters and have the the non-testing constructorpublic DefaultJobPersistence(Database database)
use the defaults from thefinal
constants.What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to work well. My style on it might still need a second check in review, as I'm not sure if I'm doing it exactly the way you intended.