Skip to content

Commit

Permalink
Provide unique id for each rollup job and add debug logs
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn committed Oct 1, 2023
1 parent a2dd769 commit e45bbe3
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,19 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont

// Get the doc value field of the dateHistogram.sourceField for the first search hit converted to epoch millis
// If the doc value is null or empty it will be treated the same as empty doc hits
val firstHitTimestampAsString: String? = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>()
val firstHitTimestampAsString: String = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>()
?: return StartingTimeResult.NoDocumentsFound
// Parse date and extract epochMillis
val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $unwrappedException")
logger.error("Error when getting initial start time for rollup [{}]: {}", rollup.id, unwrappedException)
return StartingTimeResult.Failure(unwrappedException)
} catch (e: Exception) {
// TODO: Catching general exceptions for now, can make more granular
logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $e")
logger.error("Error when getting initial start time for rollup [{}]: {}", rollup.id, e)
return StartingTimeResult.Failure(e)
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
}

// This updates the metadata for a continuous rollup after an execution of the composite search and ingestion of rollup data
fun getUpdatedContinuousMetadata(
private fun getUpdatedContinuousMetadata(
rollup: Rollup,
metadata: RollupMetadata,
internalComposite: InternalComposite
Expand Down Expand Up @@ -305,11 +305,11 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
} else MetadataResult.NoMetadata
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.debug("$errorMessage: $unwrappedException")
logger.error("$errorMessage: $unwrappedException")
return MetadataResult.Failure(errorMessage, unwrappedException)
} catch (e: Exception) {
// TODO: Catching general exceptions for now, can make more granular
logger.debug("$errorMessage: $e")
logger.error("$errorMessage: $e")
return MetadataResult.Failure(errorMessage, e)
}
}
Expand Down Expand Up @@ -389,6 +389,7 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
failureReason = "The create metadata call failed with a ${response.result?.lowercase} result"
}
}
logger.debug("Metadata update successful {}", metadata)
// TODO: Is seqno/prim and id returned for all?
return MetadataResult.Success(
metadata.copy(
Expand All @@ -401,9 +402,11 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
)
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.error("Metadata update failed ${metadata.rollupID}", unwrappedException)
return MetadataResult.Failure(errorMessage, unwrappedException)
} catch (e: Exception) {
// TODO: Catching general exceptions for now, can make more granular
logger.error("Metadata update failed ${metadata.rollupID}", e)
return MetadataResult.Failure(errorMessage, e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ object RollupRunner :
* */
@Suppress("ReturnCount", "NestedBlockDepth", "ComplexMethod", "LongMethod", "ThrowsCount")
private suspend fun runRollupJob(job: Rollup, context: JobExecutionContext, lock: LockModel) {
logger.debug("Running rollup job [${job.id}]")
var updatableLock = lock
try {
when (val jobValidity = isJobValid(job)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.opensearch.indexmanagement.util._SEQ_NO
import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import java.time.Duration
import java.time.Instant

Expand Down Expand Up @@ -112,7 +111,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

protected fun createRollup(
rollup: Rollup,
rollupId: String = OpenSearchTestCase.randomAlphaOfLength(10),
rollupId: String,
refresh: Boolean = true,
client: RestClient? = null
): Rollup {
Expand Down Expand Up @@ -150,7 +149,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

protected fun createRandomRollup(refresh: Boolean = true): Rollup {
val rollup = randomRollup()
val rollupId = createRollup(rollup, refresh = refresh).id
val rollupId = createRollup(rollup, rollupId = rollup.id, refresh = refresh).id
return getRollup(rollupId = rollupId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.rollup.RollupRestTestCase
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.junit.annotations.TestLogging
import java.util.Locale

@TestLogging(value = "level:DEBUG", reason = "Debugging tests")
@Suppress("UNCHECKED_CAST")
class RestDeleteRollupActionIT : RollupRestTestCase() {

@Throws(Exception::class)
fun `test deleting a rollup`() {
val rollup = createRandomRollup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ import org.opensearch.indexmanagement.rollup.action.get.GetRollupsRequest.Compan
import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.junit.annotations.TestLogging
import java.util.Locale

@TestLogging(value = "level:DEBUG", reason = "Debugging tests")
@Suppress("UNCHECKED_CAST")
class RestGetRollupActionIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

@Throws(Exception::class)
fun `test getting a rollup`() {
var rollup = createRollup(randomRollup())
var rollup = createRollup(randomRollup(), rollupId = "${testName}-1")
val indexedRollup = getRollup(rollup.id)
// Schema version and last updated time are updated during the creation so we need to update the original too for comparison
// Job schedule interval will have a dynamic start time
Expand Down Expand Up @@ -48,7 +51,7 @@ class RestGetRollupActionIT : RollupRestTestCase() {

@Throws(Exception::class)
fun `test getting all rollups`() {
val rollups = randomList(1, 15) { createRollup(randomRollup()) }
val rollups = randomList(1, 15) { createRollup(randomRollup() , rollupId = "${testName}-2") }

// Using a larger response size than the default in case leftover rollups prevent the ones created in this test from being returned
val res = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI?size=100")
Expand Down Expand Up @@ -91,7 +94,7 @@ class RestGetRollupActionIT : RollupRestTestCase() {
fun `test changing response size when getting rollups`() {
// Ensure at least more rollup jobs than the default (20) exists
val rollupCount = 25
repeat(rollupCount) { createRollup(randomRollup()) }
repeat(rollupCount) { createRollup(randomRollup(), rollupId = "${testName}-3") }

// The default response size is 20, so even though 25 rollup jobs were made, at most 20 will be returned
var res = client().makeRequest("GET", ROLLUP_JOBS_BASE_URI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import org.opensearch.indexmanagement.util._SEQ_NO
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.junit.annotations.TestLogging
import java.util.Locale

@TestLogging(value = "level:DEBUG", reason = "Debugging tests")
@Suppress("UNCHECKED_CAST")
class RestIndexRollupActionIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

@Throws(Exception::class)
fun `test creating a rollup`() {
val rollup = randomRollup()
Expand Down Expand Up @@ -213,7 +216,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() {
Dimension.Type.TERMS -> (it as Terms).copy(targetField = "some_other_target_field")
}
}
val rollup = createRollup(rollup = randomRollup().copy(dimensions = dimensions))
val rollup = createRollup(rollup = randomRollup().copy(dimensions = dimensions), rollupId = "${testName}-1")
client().makeRequest(
"PUT",
"$ROLLUP_JOBS_BASE_URI/${rollup.id}?refresh=true&if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}",
Expand Down Expand Up @@ -254,7 +257,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() {
}
)
}
val rollup = createRollup(rollup = randomRollup().copy(metrics = metrics))
val rollup = createRollup(rollup = randomRollup().copy(metrics = metrics) , rollupId = "${testName}-1")
client().makeRequest(
"PUT",
"$ROLLUP_JOBS_BASE_URI/${rollup.id}?refresh=true&if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.core.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class RestStartRollupActionIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

@Throws(Exception::class)
fun `test starting a stopped rollup`() {
val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null))
val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null), rollupId = "${testName}-1")
assertTrue("Rollup was not disabled", !rollup.enabled)

val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_start")
Expand All @@ -44,7 +47,7 @@ class RestStartRollupActionIT : RollupRestTestCase() {
@Throws(Exception::class)
fun `test starting a started rollup doesnt change enabled time`() {
// First create a non-started rollup
val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null))
val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null), rollupId = "${testName}-2")
assertTrue("Rollup was not disabled", !rollup.enabled)

// Enable it to get the job enabled time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.core.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class RestStopRollupActionIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

@After
fun clearIndicesAfterEachTest() {
// Flaky could happen if config index not deleted
Expand All @@ -39,7 +42,7 @@ class RestStopRollupActionIT : RollupRestTestCase() {

@Throws(Exception::class)
fun `test stopping a started rollup`() {
val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null))
val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null), rollupId = "${testName}-1")
assertTrue("Rollup was not enabled", rollup.enabled)

val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_stop")
Expand All @@ -53,7 +56,7 @@ class RestStopRollupActionIT : RollupRestTestCase() {

@Throws(Exception::class)
fun `test stopping a stopped rollup`() {
val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null))
val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null), rollupId = "${testName}-2")
assertTrue("Rollup was not enabled", rollup.enabled)

val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_stop")
Expand Down Expand Up @@ -85,6 +88,7 @@ class RestStopRollupActionIT : RollupRestTestCase() {
jobEnabledTime = Instant.now(),
metadataID = null
)
, rollupId = "${testName}-3"
)
createRollupSourceIndex(rollup)
updateRollupStartTime(rollup)
Expand Down Expand Up @@ -158,6 +162,7 @@ class RestStopRollupActionIT : RollupRestTestCase() {
jobEnabledTime = Instant.now(),
metadataID = null
)
, rollupId = "${testName}-4"
)

// Force rollup to execute which should fail as we did not create a source index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ import org.opensearch.core.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Collections.emptyMap
import java.util.Locale

class RollupRunnerIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

fun `test metadata is created for rollup job when none exists`() {
val indexName = "test_index_runner_first"

Expand Down Expand Up @@ -148,6 +151,7 @@ class RollupRunnerIT : RollupRestTestCase() {

// Define the rollup job
var rollup = randomRollup().copy(
id = "${testName}-1",
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobEnabledTime = Instant.now(),
Expand Down Expand Up @@ -270,6 +274,7 @@ class RollupRunnerIT : RollupRestTestCase() {

// Define rollup
var rollup = randomRollup().copy(
id = "${testName}-2",
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobEnabledTime = Instant.now(),
Expand Down Expand Up @@ -328,6 +333,7 @@ class RollupRunnerIT : RollupRestTestCase() {

// Define rollup
var rollup = randomRollup().copy(
id = "${testName}-3",
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobEnabledTime = Instant.now(),
Expand Down Expand Up @@ -640,6 +646,7 @@ class RollupRunnerIT : RollupRestTestCase() {
val delay: Long = 15000
// Define rollup
var rollup = randomRollup().copy(
id = "${testName}-4",
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobEnabledTime = Instant.now(),
Expand Down

0 comments on commit e45bbe3

Please sign in to comment.