-
Notifications
You must be signed in to change notification settings - Fork 6
/
ExportLogsAsFileTaskExecutor.java
167 lines (147 loc) · 6.89 KB
/
ExportLogsAsFileTaskExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - info@scireum.de
*/
package sirius.biz.process;
import com.fasterxml.jackson.databind.node.ObjectNode;
import sirius.biz.analytics.reports.Cells;
import sirius.biz.cluster.work.DistributedTaskExecutor;
import sirius.biz.jobs.batch.ExportBatchProcessFactory;
import sirius.biz.jobs.batch.file.ExportCSV;
import sirius.biz.jobs.batch.file.ExportFileType;
import sirius.biz.jobs.batch.file.ExportXLS;
import sirius.biz.jobs.batch.file.ExportXLSX;
import sirius.biz.jobs.batch.file.LineBasedExport;
import sirius.biz.process.logs.ProcessLog;
import sirius.biz.process.output.ProcessOutput;
import sirius.biz.process.output.TableProcessOutputType;
import sirius.db.es.Elastic;
import sirius.kernel.commons.Files;
import sirius.kernel.commons.Json;
import sirius.kernel.commons.Strings;
import sirius.kernel.di.std.Part;
import sirius.kernel.health.Exceptions;
import sirius.kernel.health.Log;
import sirius.kernel.nls.NLS;
import sirius.web.http.WebContext;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Responsible for exporting a {@link ProcessOutput} as MS Excel or CSV file.
* <p>
* At first, this might look like an overkill (running the export on a dedicated node within the restarted process).
* But since an export might be very large - and at least for Excel we have to keep a lot (or even all) data in memory,
* we do not want a web server node to perform this task.
*
* @see ProcessController#exportOutput(WebContext, String, String, String)
*/
public class ExportLogsAsFileTaskExecutor implements DistributedTaskExecutor {
/**
* Contains the context key used to transmit the process to export an output for.
*/
public static final String CONTEXT_PROCESS = "process";
/**
* Contains the context key used to transmit the name of the output to export. Left empty to
* export the log messages of the process.
*/
public static final String CONTEXT_OUTPUT = "output";
/**
* Contains the context key used to transmit the desired export file format.
*/
public static final String CONTEXT_FORMAT = "format";
@Part
@Nullable
private Processes processes;
@Part
private Elastic elastic;
@Part
private Cells cells;
@Part
private TableProcessOutputType tableProcessOutputType;
@Override
public String queueName() {
// We use the same queue as generic export jobs as the task is almost the same...
return ExportBatchProcessFactory.ExportBatchProcessTaskExecutor.QUEUE_NAME;
}
@Override
public void executeWork(ObjectNode context) throws Exception {
String processId = context.path(CONTEXT_PROCESS).asText(null);
processes.purgeProcessFromFirstLevelCache(processId);
processes.execute(processId, process -> executeInProcess(context, process));
}
private void executeInProcess(ObjectNode context, ProcessContext processContext) {
String outputName = context.path(CONTEXT_OUTPUT).asText(null);
ProcessOutput processOutput = Strings.isFilled(outputName) ?
processContext.fetchOutput(outputName)
.orElseThrow(() -> new IllegalArgumentException(Strings.apply(
"Unknown output: %s",
outputName))) :
null;
try (LineBasedExport export = createExport(processContext,
fetchExportFileType(context),
processOutput != null ?
processOutput.getLabel() :
NLS.get("ProcessLog.plural"))) {
AtomicInteger rowCount = new AtomicInteger(0);
processContext.fetchOutputEntries(outputName, (columns, labels) -> {
try {
export.addListRow(labels);
} catch (IOException exception) {
throw Exceptions.handle()
.to(Log.BACKGROUND)
.error(exception)
.withSystemErrorMessage("An error occurred while exporting a row: %s (%s)")
.handle();
}
}, (columns, values) -> {
try {
processContext.tryUpdateState(NLS.fmtr("Process.rowsExported")
.set("rows", rowCount.incrementAndGet())
.format());
export.addListRow(values);
return true;
} catch (IOException exception) {
throw Exceptions.handle()
.to(Log.BACKGROUND)
.error(exception)
.withSystemErrorMessage("An error occurred while exporting a row: %s (%s)")
.handle();
}
});
processContext.forceUpdateState(NLS.fmtr("Process.rowsExported").set("rows", rowCount.get()).format());
} catch (Exception exception) {
processContext.handle(exception);
}
processContext.log(ProcessLog.success().withNLSKey("ExportLogsAsFileTaskExecutor.completed"));
}
private ExportFileType fetchExportFileType(ObjectNode context) {
return Json.convertToValue(context.get(CONTEXT_FORMAT))
.getEnum(ExportFileType.class)
.orElse(ExportFileType.XLSX);
}
private LineBasedExport createExport(ProcessContext processContext, ExportFileType type, String name)
throws IOException {
String filename = Files.toSaneFileName(name).orElse("export") + "." + type.name().toLowerCase();
OutputStream outputStream = processContext.addFile(filename);
processContext.log(ProcessLog.info()
.withNLSKey("ExportLogsAsFileTaskExecutor.reportTargetFile")
.withContext("outputLabel", name)
.withContext("filename", filename));
if (type == ExportFileType.CSV) {
return new ExportCSV(new OutputStreamWriter(outputStream));
}
if (type == ExportFileType.XLS) {
return new ExportXLS(() -> outputStream);
}
if (type == ExportFileType.XLSX) {
return new ExportXLSX(() -> outputStream);
}
throw new IllegalArgumentException("Unknown export type: " + type);
}
}