Skip to content

Commit

Permalink
Add ability to stream logs from host services to cloud
Browse files Browse the repository at this point in the history
Add `os-power-mode.service` and `os-fan-profile.service` which report status
from applying power mode and fan profile configs as read from config.json.
The Supervisor sets these configs in config.json for these host services
to pick up and apply.

Relates-to: #2379
See: balena-io/open-balena-api#1792
See: balena-os/balena-jetson-orin#513
Change-type: minor
Signed-off-by: Christina Ying Wang <christina@balena.io>
  • Loading branch information
cywang117 committed Nov 6, 2024
1 parent 585cc8d commit ff5e946
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 9 deletions.
14 changes: 11 additions & 3 deletions src/lib/journald.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export interface SpawnJournalctlOpts {
unit?: string;
containerId?: string;
format: string;
filterString?: string;
filter?: string | string[];
since?: string;
until?: string;
boot?: string;
Expand Down Expand Up @@ -62,8 +62,16 @@ export function spawnJournalctl(opts: SpawnJournalctlOpts): ChildProcess {
args.push('-o');
args.push(opts.format);

if (opts.filterString) {
args.push(opts.filterString);
if (opts.filter != null) {
// A single filter argument without spaces can be passed as a string
if (typeof opts.filter === 'string') {
args.push(opts.filter);
} else {
// Multiple filter arguments need to be passed as an array of strings
// instead of a single string with spaces, as `spawn` will interpret
// the single string as a single argument to journalctl, which is invalid.
args.push(...opts.filter);
}
}

log.debug('Spawning journalctl', args.join(' '));
Expand Down
1 change: 1 addition & 0 deletions src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export function logSystemMessage(
);
}
}
export type LogSystemMessage = typeof logSystemMessage;

export function lock(containerId: string): Bluebird.Disposer<() => void> {
return takeGlobalLockRW(containerId).disposer((release) => {
Expand Down
39 changes: 34 additions & 5 deletions src/logging/monitor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { pipeline } from 'stream/promises';
import { setTimeout } from 'timers/promises';

import { spawnJournalctl, toJournalDate } from '../lib/journald';
import log from '../lib/supervisor-console';
import { setTimeout } from 'timers/promises';
import type { LogSystemMessage } from '../logger';

export type MonitorHook = (message: {
message: string;
Expand All @@ -18,6 +19,7 @@ interface JournalRow {
MESSAGE: string | number[];
PRIORITY: string;
__REALTIME_TIMESTAMP: string;
_SYSTEMD_UNIT?: string;
}

// Wait 5s when journalctl failed before trying to read the logs again
Expand Down Expand Up @@ -55,6 +57,9 @@ async function* splitStream(chunkIterable: AsyncIterable<any>) {
* Streams logs from journalctl and calls container hooks when a record is received matching container id
*/
class LogMonitor {
// Additional host services we want to stream the logs for
private HOST_SERVICES = ['os-power-mode.service', 'os-fan-profile.service'];

private containers: {
[containerId: string]: {
hook: MonitorHook;
Expand All @@ -65,18 +70,23 @@ class LogMonitor {
// Only stream logs since the start of the supervisor
private lastSentTimestamp = Date.now() - performance.now();

public async start(): Promise<void> {
public async start(logSystemMessage: LogSystemMessage): Promise<void> {
try {
// TODO: do not spawn journalctl if logging is not enabled
const { stdout, stderr } = spawnJournalctl({
all: true,
follow: true,
format: 'json',
filterString: '_SYSTEMD_UNIT=balena.service',
filter: [
// Monitor logs from balenad by default for container log-streaming
'balena.service',
// Add any host services we want to stream
...this.HOST_SERVICES,
].map((s) => `_SYSTEMD_UNIT=${s}`),
since: toJournalDate(this.lastSentTimestamp),
});
if (!stdout) {
// this will be catched below
// This error will be caught below
throw new Error('failed to open process stream');
}

Expand All @@ -96,6 +106,8 @@ class LogMonitor {
self.containers[row.CONTAINER_ID_FULL]
) {
await self.handleRow(row);
} else if (self.HOST_SERVICES.includes(row._SYSTEMD_UNIT)) {
await self.handleHostServiceRow(row, logSystemMessage);
}
} catch {
// ignore parsing errors
Expand All @@ -116,7 +128,7 @@ class LogMonitor {
`Spawning another process to watch host service logs in ${wait / 1000}s`,
);
await setTimeout(wait);
void this.start();
void this.start(logSystemMessage);
}

public isAttached(containerId: string): boolean {
Expand Down Expand Up @@ -157,6 +169,23 @@ class LogMonitor {
await this.containers[containerId].hook({ message, isStdErr, timestamp });
this.lastSentTimestamp = timestamp;
}

private async handleHostServiceRow(
row: JournalRow & { _SYSTEMD_UNIT: string },
logSystemMessage: LogSystemMessage,
) {
const message = messageFieldToString(row.MESSAGE);
if (message == null) {
return;
}
// Prune '.service' from the end of the service name
void logSystemMessage(
`${row._SYSTEMD_UNIT.replace(/\.service$/, '')}: ${message}`,
{},
undefined,
false,
);
}
}

const logMonitor = new LogMonitor();
Expand Down
2 changes: 1 addition & 1 deletion src/supervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export class Supervisor {
apiBinder.start(),
]);

await logMonitor.start();
await logMonitor.start(logger.logSystemMessage);
}
}

Expand Down
3 changes: 3 additions & 0 deletions test/unit/lib/journald.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ describe('lib/journald', () => {
unit: 'nginx.service',
containerId: 'abc123',
format: 'json-pretty',
filter: ['_SYSTEMD_UNIT=test.service', '_SYSTEMD_UNIT=test2.service'],
since: '2014-03-25 03:59:56.654563',
until: '2014-03-25 03:59:59.654563',
boot: '0',
Expand All @@ -51,6 +52,8 @@ describe('lib/journald', () => {
'2014-03-25 03:59:59.654563',
'-b',
'0',
'_SYSTEMD_UNIT=test.service',
'_SYSTEMD_UNIT=test2.service',
];

const actualCommand = spawn.firstCall.args[0];
Expand Down

0 comments on commit ff5e946

Please sign in to comment.