Skip to content

Commit

Permalink
Elasticsearch search service related changes in other subsystems
Browse files Browse the repository at this point in the history
  • Loading branch information
gregorydlogan committed Feb 8, 2024
1 parent 27eb10e commit a49ba3c
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 202 deletions.
10 changes: 10 additions & 0 deletions modules/adopter-registration-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@
<groupId>org.osgi</groupId>
<artifactId>org.osgi.service.component.annotations</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.opencastproject</groupId>
<artifactId>opencast-dublincore</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@
import org.opencastproject.adopter.statistic.dto.StatisticData;
import org.opencastproject.assetmanager.api.AssetManager;
import org.opencastproject.capture.admin.api.CaptureAgentStateService;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.search.api.SearchQuery;
import org.opencastproject.metadata.dublincore.DublinCore;
import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
import org.opencastproject.search.api.SearchResult;
import org.opencastproject.search.api.SearchResultItem;
import org.opencastproject.search.api.SearchResultList;
import org.opencastproject.search.api.SearchService;
import org.opencastproject.security.api.DefaultOrganization;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UnauthorizedException;
import org.opencastproject.security.api.User;
import org.opencastproject.security.api.UserProvider;
import org.opencastproject.security.util.SecurityUtil;
Expand All @@ -48,6 +46,8 @@
import org.opencastproject.userdirectory.JpaUserAndRoleProvider;
import org.opencastproject.userdirectory.JpaUserReferenceProvider;

import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Version;
import org.osgi.service.component.annotations.Activate;
Expand All @@ -58,7 +58,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Timer;
Expand Down Expand Up @@ -292,14 +291,6 @@ private String collectStatisticData(String adopterKey, String statisticKey) thro
statisticData.setJobCount(serviceRegistry.count(null, null));

statisticData.setSeriesCount(seriesService.getSeriesCount());
SearchQuery sq = new SearchQuery();
sq.withId("");
sq.withElementTags(new String[0]);
sq.withElementFlavors(new MediaPackageElementFlavor[0]);
sq.signURLs(false);
sq.includeEpisodes(true);
sq.includeSeries(false);
sq.withLimit(SEARCH_ITERATION_SIZE);

List<Organization> orgs = organizationDirectoryService.getOrganizations();
statisticData.setTenantCount(orgs.size());
Expand All @@ -313,28 +304,18 @@ private String collectStatisticData(String adopterKey, String statisticKey) thro
int orgCAs = caStateService.getKnownAgents().size();
statisticData.setCACount(current + orgCAs);

//Calculate the total number of minutes for this org, add it to the total
current = statisticData.getTotalMinutes();
long orgDuration = 0L;
long total = 0;
int offset = 0;
try {
do {
sq.withOffset(offset);
SearchResult sr = searchService.getForAdministrativeRead(sq);
offset += SEARCH_ITERATION_SIZE;
total = sr.getTotalSize();
orgDuration = Arrays.stream(sr.getItems())
.map(SearchResultItem::getMediaPackage)
.map(MediaPackage::getDuration)
.mapToLong(Long::valueOf)
.sum() / 1000L;
} while (offset + SEARCH_ITERATION_SIZE <= total);
} catch (UnauthorizedException e) {
//This should never happen, but...
logger.warn("Unable to calculate total minutes, unauthorized");
}
statisticData.setTotalMinutes(current + orgDuration);
final SearchSourceBuilder q = new SearchSourceBuilder().query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(SearchResult.TYPE, SearchService.IndexEntryType.Episode))
.must(QueryBuilders.termQuery(SearchResult.ORG, org.getId()))
.must(QueryBuilders.termQuery(SearchResult.DELETED_DATE, "null")));
final SearchResultList results = searchService.search(q);
long orgMilis = results.getHits().stream().map(
result -> EncodingSchemeUtils.decodeDuration(
result.getDublinCore().getFirst(DublinCore.PROPERTY_EXTENT)))
.filter(Objects::nonNull)
.reduce(Long::sum).orElse(0L);
statisticData.setTotalMinutes(statisticData.getTotalMinutes() + (orgMilis / 1000 / 60));

//Add the users for each org
long currentUsers = statisticData.getUserCount();
Expand Down
4 changes: 4 additions & 0 deletions modules/conductor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
import org.opencastproject.metadata.dublincore.DublinCoreUtil;
import org.opencastproject.search.api.SearchException;
import org.opencastproject.search.api.SearchQuery;
import org.opencastproject.search.api.SearchResult;
import org.opencastproject.search.api.SearchResultItem;
import org.opencastproject.search.api.SearchService;
import org.opencastproject.security.api.AclScope;
import org.opencastproject.security.api.AuthorizationService;
Expand Down Expand Up @@ -209,18 +206,15 @@ public void handleEvent(final SeriesItem seriesItem) {
try {
securityService.setUser(SecurityUtil.createSystemUser(systemAccount, prevOrg));

SearchQuery q = new SearchQuery().withSeriesId(seriesId).withLimit(-1);
SearchResult result = searchService.getForAdministrativeRead(q);

for (SearchResultItem item : result.getItems()) {
MediaPackage mp = item.getMediaPackage();
Organization org = organizationDirectoryService.getOrganization(item.getOrganization());
for (var seriesData: searchService.getSeries(seriesId)) {
var mp = seriesData.getRight();
Organization org = seriesData.getLeft();
securityService.setOrganization(org);

// If the security policy has been updated, make sure to distribute that change
// to the distribution channels as well
if (SeriesItem.Type.UpdateAcl.equals(seriesItem.getType())) {
if (seriesItem.getOverrideEpisodeAcl()) {
if (Boolean.TRUE.equals(seriesItem.getOverrideEpisodeAcl())) {

MediaPackageElement[] distributedEpisodeAcls = mp.getElementsByFlavor(XACML_POLICY_EPISODE);
authorizationService.removeAcl(mp, AclScope.Episode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.opencastproject.mediapackage.Publication;
import org.opencastproject.mediapackage.selector.SimpleElementSelector;
import org.opencastproject.search.api.SearchException;
import org.opencastproject.search.api.SearchQuery;
import org.opencastproject.search.api.SearchResult;
import org.opencastproject.search.api.SearchService;
import org.opencastproject.security.api.UnauthorizedException;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowOperationException;
import org.opencastproject.workflow.api.WorkflowOperationHandler;
Expand All @@ -53,10 +53,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Workflow operation for retracting parts of a media package from the engage player.
Expand Down Expand Up @@ -118,45 +116,36 @@ public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobConte
logger.info("Partially retracting {}", mediaPackage.getIdentifier());
List<Job> jobs;
try {
SearchQuery query = new SearchQuery().withId(mediaPackage.getIdentifier().toString());
query.includeEpisodes(true);
query.includeSeries(false);
SearchResult result = searchService.getByQuery(query);
if (result.size() == 0) {
logger.info("The search service doesn't know mediapackage {}", mediaPackage);
try {
searchMP = searchService.get(mediaPackage.getIdentifier().toString());
} catch (NotFoundException e) {
logger.info("The search service doesn't know media package {}", mediaPackage);
return createResult(mediaPackage, WorkflowOperationResult.Action.SKIP);
} else if (result.size() > 1) {
logger.warn("More than one mediapackage with id {} returned from search service", mediaPackage.getIdentifier());
throw new WorkflowOperationException(
"More than one mediapackage with id " + mediaPackage.getIdentifier() + " found");
} else {
searchMP = result.getItems()[0].getMediaPackage();
Set<String> retractElementIds = new HashSet<>();
Collection<MediaPackageElement> retractElements = retractElementSelector.select(searchMP, false);
Publication publicationElement = findPublicationElement(mediaPackage);

//Pull down the elements themselves
logger.debug("Found {} matching elements", retractElements.size());
for (MediaPackageElement element : retractElements) {
retractElementIds.add(element.getIdentifier());
logger.debug("Retracting {}", element.getIdentifier());
}
jobs = retractElements(retractElementIds, searchMP);
if (jobs.size() < 1) {
logger.debug("No matching elements found");
return createResult(mediaPackage, WorkflowOperationResult.Action.CONTINUE);
}
} catch (UnauthorizedException e) {
throw new WorkflowOperationException("Not allowed to access media package " + mediaPackage);
}
var retractElements = retractElementSelector.select(searchMP, false);
var retractElementIds = retractElements.stream()
.map(MediaPackageElement::getIdentifier)
.collect(Collectors.toSet());
logger.debug("Found {} matching elements", retractElementIds.size());

// Pull down the elements themselves
jobs = retractElements(retractElementIds, searchMP);
if (jobs.size() < 1) {
logger.debug("No matching elements found");
return createResult(mediaPackage, WorkflowOperationResult.Action.CONTINUE);
}

for (MediaPackageElement element : retractElements) {
logger.debug("Removing {} from mediapackage", element.getIdentifier());
//Remove the element from the workflow mp
mediaPackage.remove(element);
searchMP.remove(element);
}
for (MediaPackageElement element : retractElements) {
logger.debug("Removing {} from mediapackage", element.getIdentifier());
//Remove the element from the workflow mp
mediaPackage.remove(element);
searchMP.remove(element);
}

// Wait for retraction to finish
if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
if (!waitForStatus(jobs.toArray(new Job[0])).isSuccess()) {
throw new WorkflowOperationException("The retract jobs did not complete successfully");
}
Job deleteSearchJob = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
import org.opencastproject.metadata.dublincore.DublinCoreValue;
import org.opencastproject.metadata.dublincore.DublinCoreXmlFormat;
import org.opencastproject.search.api.SearchException;
import org.opencastproject.search.api.SearchQuery;
import org.opencastproject.search.api.SearchResult;
import org.opencastproject.search.api.SearchService;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
Expand Down Expand Up @@ -265,7 +263,14 @@ public WorkflowOperationResult start(final WorkflowInstance workflowInstance, Jo

// First check if mp exists in the search index and strategy is merge
// to avoid leaving distributed elements around.
MediaPackage distributedMp = getDistributedMediapackage(mediaPackage.getIdentifier().toString());
MediaPackage distributedMp = null;
try {
distributedMp = searchService.get(mediaPackage.getIdentifier().toString());
} catch (NotFoundException e) {
logger.debug("No published mediapackage found for {}", mediaPackage.getIdentifier().toString());
} catch (UnauthorizedException e) {
throw new WorkflowOperationException("Unauthorized for " + mediaPackage.getIdentifier().toString(), e);
}
if (PUBLISH_STRATEGY_MERGE.equals(republishStrategy) && distributedMp == null) {
logger.info("Skipping republish for {} since it is not currently published",
mediaPackage.getIdentifier().toString());
Expand Down Expand Up @@ -691,23 +696,13 @@ private void applyTags(MediaPackageElement element, String[] tags) {
}
}

protected MediaPackage getDistributedMediapackage(String mediaPackageID) throws WorkflowOperationException {
MediaPackage mediaPackage = null;
SearchQuery query = new SearchQuery().withId(mediaPackageID);
query.includeEpisodes(true);
query.includeSeries(false);
SearchResult result = searchService.getByQuery(query);
if (result.size() == 0) {
logger.info("The search service doesn't know mediapackage {}.", mediaPackageID);
return mediaPackage; // i.e. null
} else if (result.size() > 1) {
logger.warn("More than one mediapackage with id {} returned from search service", mediaPackageID);
throw new WorkflowOperationException("More than one mediapackage with id " + mediaPackageID + " found");
} else {
// else, merge the new with the existing (new elements will overwrite existing elements)
mediaPackage = result.getItems()[0].getMediaPackage();
protected MediaPackage getDistributedMediaPackage(String mediaPackageID) throws UnauthorizedException {
try {
return searchService.get(mediaPackageID);
} catch (NotFoundException e) {
logger.info("The search service doesn't know media package {}.", mediaPackageID);
return null;
}
return mediaPackage;
}


Expand Down Expand Up @@ -778,8 +773,8 @@ private void removePublicationElement(MediaPackage mediaPackage) {
* @throws WorkflowOperationException
*/
private void retractFromEngage(MediaPackage distributedMediaPackage) throws WorkflowOperationException {
List<Job> jobs = new ArrayList<Job>();
Set<String> elementIds = new HashSet<String>();
List<Job> jobs = new ArrayList<>();
Set<String> elementIds = new HashSet<>();
try {
if (distributedMediaPackage != null) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.Publication;
import org.opencastproject.search.api.SearchQuery;
import org.opencastproject.search.api.SearchResult;
import org.opencastproject.search.api.SearchService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowOperationException;
Expand All @@ -50,9 +49,10 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Workflow operation for retracting a media package from the engage player.
Expand Down Expand Up @@ -166,27 +166,21 @@ public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobConte
MediaPackage mediaPackage = workflowInstance.getMediaPackage();
List<Job> jobs;
try {
SearchQuery query = new SearchQuery().withId(mediaPackage.getIdentifier().toString());
SearchResult result = searchService.getByQuery(query);
if (result.size() == 0) {
logger.info("The search service doesn't know mediapackage {}", mediaPackage);
MediaPackage searchMediaPackage = null;
try {
searchMediaPackage = searchService.get(mediaPackage.getIdentifier().toString());
} catch (NotFoundException e) {
logger.info("The search service doesn't know media package {}", mediaPackage);
return createResult(mediaPackage, Action.SKIP);
} else if (result.size() > 1) {
logger.warn("More than one mediapackage with id {} returned from search service", mediaPackage.getIdentifier());
throw new WorkflowOperationException("More than one mediapackage with id " + mediaPackage.getIdentifier()
+ " found");
} else {
Set<String> retractElementIds = new HashSet<String>();
MediaPackage searchMediaPackage = result.getItems()[0].getMediaPackage();
logger.info("Retracting media package {} from download/streaming distribution channel", searchMediaPackage);
for (MediaPackageElement element : searchMediaPackage.getElements()) {
retractElementIds.add(element.getIdentifier());
}
jobs = retractElements(retractElementIds, searchMediaPackage);
}
logger.info("Retracting media package {} from download/streaming distribution channel", searchMediaPackage);
var retractElementIds = Arrays.stream(searchMediaPackage.getElements())
.map(MediaPackageElement::getIdentifier)
.collect(Collectors.toSet());
jobs = retractElements(retractElementIds, searchMediaPackage);

// Wait for retraction to finish
if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
if (!waitForStatus(jobs.toArray(new Job[0])).isSuccess()) {
throw new WorkflowOperationException("One of the download/streaming retract job did not complete successfully");
}

Expand Down
Loading

0 comments on commit a49ba3c

Please sign in to comment.