From 1d649d63ac2589b54218a40f9138456fc961a9c9 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Tue, 11 Jun 2024 13:32:14 +0200 Subject: [PATCH 01/10] feat: market scan --- README.md | 26 +++--- examples/advanced/scan.ts | 57 ++++++++++++ src/market/api.ts | 6 ++ src/market/index.ts | 1 + src/market/market.module.ts | 24 +++++ src/market/scan/index.ts | 3 + src/market/scan/scan-director.ts | 81 +++++++++++++++++ src/market/scan/scanned-proposal.ts | 91 +++++++++++++++++++ src/market/scan/types.ts | 17 ++++ .../yagna/adapters/market-api-adapter.ts | 57 ++++++++++++ tests/examples/examples.json | 1 + 11 files changed, 352 insertions(+), 12 deletions(-) create mode 100644 examples/advanced/scan.ts create mode 100644 src/market/scan/index.ts create mode 100644 src/market/scan/scan-director.ts create mode 100644 src/market/scan/scanned-proposal.ts create mode 100644 src/market/scan/types.ts diff --git a/README.md b/README.md index e177d282f..2da6185dd 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ - [Custom filters](#custom-filters) - [Custom ranking of proposals](#custom-ranking-of-proposals) - [Uploading local images to the provider](#uploading-local-images-to-the-provider) + - [Market scan](#market-scan) - [Going further](#going-further) - [More examples](#more-examples) - [Debugging](#debugging) @@ -356,25 +357,26 @@ const order: MarketOrderSpec = { [Check the full example](./examples/advanced//local-image/) - +[Check the full example](./examples/advanced/scan.ts) ## Going further diff --git a/examples/advanced/scan.ts b/examples/advanced/scan.ts new file mode 100644 index 000000000..9a7000c0a --- /dev/null +++ b/examples/advanced/scan.ts @@ -0,0 +1,57 @@ +/** + * This example demonstrates how to scan the market for providers that meet specific requirements. + */ +import { GolemNetwork, ScanOptions } from "@golem-sdk/golem-js"; +import { last, map, scan, takeUntil, tap, timer } from "rxjs"; + +// What providers are we looking for? +const scanOptions: ScanOptions = { + subnetTag: "public", + workload: { + engine: "vm", + minCpuCores: 4, + minMemGib: 8, + minCpuThreads: 8, + capabilities: ["vpn"], + minStorageGib: 16, + }, +}; + +(async () => { + const glm = new GolemNetwork(); + await glm.connect(); + const spec = glm.market.buildScanSpecification(scanOptions); + + const SCAN_DURATION_MS = 10_000; + + console.log(`Scanning for ${SCAN_DURATION_MS / 1000} seconds...`); + glm.market + .scan(spec) + .pipe( + tap((scannedProposal) => { + console.log("Received proposal from:", scannedProposal.getProviderInfo().name); + }), + // calculate the cost of an hour of work + map( + (scannedProposal) => + scannedProposal.pricing.start + // + scannedProposal.pricing.cpuSec * 3600 + + scannedProposal.pricing.envSec * 3600, + ), + // calculate the running average + scan((total, cost) => total + cost, 0), + map((totalCost, index) => totalCost / (index + 1)), + // stop scanning after SCAN_DURATION_MS + takeUntil(timer(SCAN_DURATION_MS)), + last(), + ) + .subscribe({ + next: (averageCost) => { + console.log("Average cost for an hour of work:", averageCost.toFixed(6), "GLM"); + }, + complete: () => { + console.log("Scan completed, shutting down..."); + glm.disconnect(); + }, + }); +})(); diff --git a/src/market/api.ts b/src/market/api.ts index dd73df10b..c95ce81d8 100644 --- a/src/market/api.ts +++ b/src/market/api.ts @@ -18,6 +18,7 @@ import { AgreementTerminatedEvent, } from "./agreement"; import { AgreementOptions } from "./agreement/agreement"; +import { ScanSpecification, ScannedOffer } from "./scan"; export type MarketEvents = { demandSubscriptionStarted: (demand: Demand) => void; @@ -140,4 +141,9 @@ export interface IMarketApi { * Retrieves the state of an agreement based on the provided agreement ID. */ getAgreementState(id: string): Promise; + + /** + * Scan the market for offers that match the given specification. + */ + scan(scanSpecification: ScanSpecification): Observable; } diff --git a/src/market/index.ts b/src/market/index.ts index 8b2122145..e50e2ec83 100644 --- a/src/market/index.ts +++ b/src/market/index.ts @@ -12,3 +12,4 @@ export { BasicDemandDirector } from "./demand/directors/basic-demand-director"; export { PaymentDemandDirector } from "./demand/directors/payment-demand-director"; export { WorkloadDemandDirector } from "./demand/directors/workload-demand-director"; export * from "./proposal/market-proposal-event"; +export * from "./scan"; diff --git a/src/market/market.module.ts b/src/market/market.module.ts index f835a95ea..af1f186db 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -42,6 +42,7 @@ import { MarketOrderSpec } from "../golem-network"; import { INetworkApi, NetworkModule } from "../network"; import { AgreementOptions } from "./agreement/agreement"; import { Concurrency } from "../lease-process"; +import { ScanDirector, ScanOptions, ScanSpecification, ScannedOffer } from "./scan"; export type DemandEngine = "vm" | "vm-nvidia" | "wasmtime"; @@ -82,6 +83,13 @@ export interface MarketModule { */ buildDemandDetails(options: BuildDemandOptions, allocation: Allocation): Promise; + /** + * Build a ScanSpecification that can be used to scan the market for offers. + * The difference between this method and `buildDemandDetails` is that this method does not require an + * allocation, doesn't set payment related properties and doesn't provide any defaults. + */ + buildScanSpecification(options: ScanOptions): ScanSpecification; + /** * Publishes the demand to the market and handles refreshing it when needed. * Each time the demand is refreshed, a new demand is emitted by the observable. @@ -189,6 +197,11 @@ export interface MarketModule { * Fetch the most up-to-date agreement details from the yagna */ fetchAgreement(agreementId: string): Promise; + + /** + * Scan the market for offers that match the given demand specification. + */ + scan(scanSpecification: ScanSpecification): Observable; } /** @@ -265,6 +278,13 @@ export class MarketModuleImpl implements MarketModule { return new DemandSpecification(builder.getProduct(), allocation.paymentPlatform, basicConfig.expirationSec); } + buildScanSpecification(options: ScanOptions): ScanSpecification { + const builder = new DemandBodyBuilder(); + const director = new ScanDirector(options); + director.apply(builder); + return builder.getProduct(); + } + /** * Augments the user-provided options with additional logic * @@ -649,4 +669,8 @@ export class MarketModuleImpl implements MarketModule { } return isPriceValid; } + + scan(scanSpecification: ScanSpecification): Observable { + return this.deps.marketApi.scan(scanSpecification); + } } diff --git a/src/market/scan/index.ts b/src/market/scan/index.ts new file mode 100644 index 000000000..a6e78b177 --- /dev/null +++ b/src/market/scan/index.ts @@ -0,0 +1,3 @@ +export * from "./types"; +export * from "./scan-director"; +export * from "./scanned-proposal"; diff --git a/src/market/scan/scan-director.ts b/src/market/scan/scan-director.ts new file mode 100644 index 000000000..c2e5a8cd1 --- /dev/null +++ b/src/market/scan/scan-director.ts @@ -0,0 +1,81 @@ +import { ComparisonOperator, DemandBodyBuilder } from "../demand"; +import { ScanOptions } from "./types"; + +export class ScanDirector { + constructor(private options: ScanOptions) {} + + public async apply(builder: DemandBodyBuilder) { + this.addWorkloadDecorations(builder); + this.addGenericDecorations(builder); + this.addManifestDecorations(builder); + this.addPaymentDecorations(builder); + } + + private addPaymentDecorations(builder: DemandBodyBuilder): void { + if (this.options.payment?.debitNotesAcceptanceTimeoutSec) { + builder.addProperty( + "golem.com.payment.debit-notes.accept-timeout?", + this.options.payment?.debitNotesAcceptanceTimeoutSec, + ); + } + if (this.options.payment?.midAgreementDebitNoteIntervalSec) { + builder.addProperty( + "golem.com.scheme.payu.debit-note.interval-sec?", + this.options.payment?.midAgreementDebitNoteIntervalSec, + ); + } + if (this.options.payment?.midAgreementPaymentTimeoutSec) { + builder.addProperty( + "golem.com.scheme.payu.payment-timeout-sec?", + this.options.payment?.midAgreementPaymentTimeoutSec, + ); + } + } + + private addWorkloadDecorations(builder: DemandBodyBuilder): void { + if (this.options.workload?.engine) { + builder.addConstraint("golem.runtime.name", this.options.workload?.engine); + } + if (this.options.workload?.capabilities) + this.options.workload?.capabilities.forEach((cap) => builder.addConstraint("golem.runtime.capabilities", cap)); + + if (this.options.workload?.minMemGib) { + builder.addConstraint("golem.inf.mem.gib", this.options.workload?.minMemGib, ComparisonOperator.GtEq); + } + if (this.options.workload?.minStorageGib) { + builder.addConstraint("golem.inf.storage.gib", this.options.workload?.minStorageGib, ComparisonOperator.GtEq); + } + if (this.options.workload?.minCpuThreads) { + builder.addConstraint("golem.inf.cpu.threads", this.options.workload?.minCpuThreads, ComparisonOperator.GtEq); + } + if (this.options.workload?.minCpuCores) { + builder.addConstraint("golem.inf.cpu.cores", this.options.workload?.minCpuCores, ComparisonOperator.GtEq); + } + } + + private addGenericDecorations(builder: DemandBodyBuilder): void { + if (this.options.subnetTag) { + builder + .addProperty("golem.node.debug.subnet", this.options.subnetTag) + .addConstraint("golem.node.debug.subnet", this.options.subnetTag); + } + + if (this.options.expirationSec) { + builder.addProperty("golem.srv.comp.expiration", Date.now() + this.options.expirationSec * 1000); + } + } + + private addManifestDecorations(builder: DemandBodyBuilder): void { + if (!this.options.workload?.manifest) return; + builder.addProperty("golem.srv.comp.payload", this.options.workload?.manifest); + if (this.options.workload?.manifestSig) { + builder.addProperty("golem.srv.comp.payload.sig", this.options.workload?.manifestSig); + } + if (this.options.workload?.manifestSigAlgorithm) { + builder.addProperty("golem.srv.comp.payload.sig.algorithm", this.options.workload?.manifestSigAlgorithm); + } + if (this.options.workload?.manifestCert) { + builder.addProperty("golem.srv.comp.payload.cert", this.options.workload?.manifestCert); + } + } +} diff --git a/src/market/scan/scanned-proposal.ts b/src/market/scan/scanned-proposal.ts new file mode 100644 index 000000000..a72b73322 --- /dev/null +++ b/src/market/scan/scanned-proposal.ts @@ -0,0 +1,91 @@ +import { PricingInfo, ProposalProperties } from "../proposal"; +import { GolemInternalError } from "../../shared/error/golem-error"; + +// Raw response from yagna +// TODO: add to ya-client +type ScannedOfferDTO = { + properties: ProposalProperties; + constraints: string; + offerId: string; + providerId: string; + timestamp: string; +}; + +export class ScannedOffer { + constructor(private readonly model: ScannedOfferDTO) {} + + get properties(): ProposalProperties { + return this.model.properties; + } + + get constraints(): string { + return this.model.constraints; + } + + get pricing(): PricingInfo { + const usageVector = this.properties["golem.com.usage.vector"]; + const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; + + if (!usageVector) { + throw new GolemInternalError( + "The proposal does not contain 'golem.com.usage.vector' property. We can't estimate the costs.", + ); + } + + if (!priceVector) { + throw new GolemInternalError( + "The proposal does not contain 'golem.com.pricing.model.linear.coeffs' property. We can't estimate costs.", + ); + } + + const envIdx = usageVector.findIndex((ele) => ele === "golem.usage.duration_sec"); + const cpuIdx = usageVector.findIndex((ele) => ele === "golem.usage.cpu_sec"); + + const envSec = priceVector[envIdx] ?? 0.0; + const cpuSec = priceVector[cpuIdx] ?? 0.0; + const start = priceVector[priceVector.length - 1]; + + return { + cpuSec, + envSec, + start, + }; + } + + getProviderInfo() { + return { + id: this.model.providerId, + name: this.properties["golem.node.id.name"] || "", + }; + } + get transferProtocol() { + return this.properties["golem.activity.caps.transfer.protocol"]; + } + get cpuBrand() { + return this.properties["golem.inf.cpu.brand"]; + } + get cpuCapabilities() { + return this.properties["golem.inf.cpu.capabilities"]; + } + get cpuCores() { + return this.properties["golem.inf.cpu.cores"]; + } + get cpuThreads() { + return this.properties["golem.inf.cpu.threads"]; + } + get memory() { + return this.properties["golem.inf.mem.gib"]; + } + get storage() { + return this.properties["golem.inf.storage.gib"]; + } + get publicNet() { + return this.properties["golem.node.net.is-public"]; + } + get runtimeCapabilities() { + return this.properties["golem.runtime.capabilities"]; + } + get runtimeName() { + return this.properties["golem.runtime.name"]; + } +} diff --git a/src/market/scan/types.ts b/src/market/scan/types.ts new file mode 100644 index 000000000..fcfd812f2 --- /dev/null +++ b/src/market/scan/types.ts @@ -0,0 +1,17 @@ +import { BuildDemandOptions, DemandProperty } from "../demand"; + +// recursively make all properties optional (but not array members) +type DeepPartial = T extends object + ? T extends Array + ? T + : { + [P in keyof T]?: DeepPartial; + } + : T; + +export type ScanOptions = DeepPartial; + +export type ScanSpecification = { + properties: DemandProperty[]; + constraints: string[]; +}; diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index 0dd5c8cf4..2b0e01ec5 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -19,6 +19,8 @@ import { getMessageFromApiError } from "../../utils/apiErrorMessage"; import { withTimeout } from "../../utils/timeout"; import { AgreementOptions, IAgreementRepository } from "../../../market/agreement/agreement"; import { IProposalRepository, MarketProposal, OfferCounterProposal } from "../../../market/proposal"; +import EventSource from "eventsource"; +import { ScanSpecification, ScannedOffer } from "../../../market/scan"; /** * A bit more user-friendly type definition of DemandOfferBaseDTO from ya-ts-client @@ -393,4 +395,59 @@ export class MarketApiAdapter implements IMarketApi { private isOfferCounterProposal(proposal: MarketProposal): proposal is OfferCounterProposal { return proposal.issuer === "Requestor"; } + + public scan(spec: ScanSpecification): Observable { + const ac = new AbortController(); + return new Observable((observer) => { + this.yagnaApi.market.httpRequest + .request({ + body: { + ...this.buildDemandRequestBody(spec), + type: "offer", + }, + method: "POST", + url: "/scan", + }) + .then((iterator) => { + const cleanupIterator = () => + this.yagnaApi.market.httpRequest.request({ + method: "DELETE", + url: `/scan/${iterator}`, + }); + + if (ac.signal.aborted) { + void cleanupIterator(); + return; + } + + const eventSource = new EventSource( + `${this.yagnaApi.market.httpRequest.config.BASE}/scan/${iterator}/events`, + { + headers: { + Accept: "text/event-stream", + Authorization: `Bearer ${this.yagnaApi.yagnaOptions.apiKey}`, + }, + }, + ); + + eventSource.addEventListener("offer", (event) => { + try { + const parsed = JSON.parse(event.data); + observer.next(new ScannedOffer(parsed)); + } catch (error) { + observer.error(error); + } + }); + eventSource.addEventListener("error", (error) => observer.error(error)); + + ac.signal.onabort = () => { + eventSource.close(); + void cleanupIterator(); + }; + }); + return () => { + ac.abort(); + }; + }); + } } diff --git a/tests/examples/examples.json b/tests/examples/examples.json index 85036fefd..9cc36c14b 100644 --- a/tests/examples/examples.json +++ b/tests/examples/examples.json @@ -10,6 +10,7 @@ { "cmd": "tsx", "path": "examples/advanced/proposal-filter.ts" }, { "cmd": "tsx", "path": "examples/advanced/proposal-predefined-filter.ts" }, { "cmd": "tsx", "path": "examples/advanced/override-module.ts" }, + { "cmd": "tsx", "path": "examples/advanced/scan.ts" }, { "cmd": "tsx", "path": "examples/experimental/deployment/new-api.ts" }, { "cmd": "tsx", "path": "examples/experimental/job/getJobById.ts" }, { "cmd": "tsx", "path": "examples/experimental/job/waitForResults.ts" }, From 06fd76a177b88dfbc3cacee1f79b821559799326 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Tue, 11 Jun 2024 13:36:15 +0200 Subject: [PATCH 02/10] docs: add advanced scan to examples package.json scripts --- examples/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/package.json b/examples/package.json index 906ec5caa..6dabfff1a 100644 --- a/examples/package.json +++ b/examples/package.json @@ -16,6 +16,7 @@ "advanced-payment-filters": "tsx advanced/payment-filters.ts", "advanced-proposal-filters": "tsx advanced/proposal-filter.ts", "advanced-proposal-predefined-filter": "tsx advanced/proposal-predefined-filter.ts", + "advanced-scan": "tsx advanced/scan.ts", "local-image": "tsx advanced/local-image/serveLocalGvmi.ts", "deployment": "tsx experimental/deployment/new-api.ts", "market-scan": "tsx market/scan.ts", From e59bd18724b59e78b1df468654404c3671fd3f6e Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Tue, 11 Jun 2024 13:39:19 +0200 Subject: [PATCH 03/10] docs: update wording around scanned offers --- README.md | 4 ++-- examples/advanced/scan.ts | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 2da6185dd..506673f0b 100644 --- a/README.md +++ b/README.md @@ -367,8 +367,8 @@ await glm.market .scan(order) .pipe(takeUntil(timer(10_000))) .subscribe({ - next: (proposal) => { - console.log("Received proposal from provider", proposal.provider.name); + next: (scannedOffer) => { + console.log("Found offer from", scannedOffer.getProviderInfo().name); }, complete: () => { console.log("Market scan completed"); diff --git a/examples/advanced/scan.ts b/examples/advanced/scan.ts index 9a7000c0a..587c8b2a0 100644 --- a/examples/advanced/scan.ts +++ b/examples/advanced/scan.ts @@ -28,15 +28,15 @@ const scanOptions: ScanOptions = { glm.market .scan(spec) .pipe( - tap((scannedProposal) => { - console.log("Received proposal from:", scannedProposal.getProviderInfo().name); + tap((scannedOffer) => { + console.log("Found offer from", scannedOffer.getProviderInfo().name); }), // calculate the cost of an hour of work map( - (scannedProposal) => - scannedProposal.pricing.start + // - scannedProposal.pricing.cpuSec * 3600 + - scannedProposal.pricing.envSec * 3600, + (scannedOffer) => + scannedOffer.pricing.start + // + scannedOffer.pricing.cpuSec * 3600 + + scannedOffer.pricing.envSec * 3600, ), // calculate the running average scan((total, cost) => total + cost, 0), From 8ae7e4f632ac47a61cb212f03db877ea207c4002 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Tue, 11 Jun 2024 13:47:54 +0200 Subject: [PATCH 04/10] docs: add note of advanced usage to scan example --- examples/advanced/scan.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/examples/advanced/scan.ts b/examples/advanced/scan.ts index 587c8b2a0..f55a01268 100644 --- a/examples/advanced/scan.ts +++ b/examples/advanced/scan.ts @@ -22,6 +22,13 @@ const scanOptions: ScanOptions = { await glm.connect(); const spec = glm.market.buildScanSpecification(scanOptions); + /* For advanced users: you can also add properties and constraints manually: + spec.properties.push({ + key: "golem.inf.cpu.architecture", + value: "x86_64", + }); + */ + const SCAN_DURATION_MS = 10_000; console.log(`Scanning for ${SCAN_DURATION_MS / 1000} seconds...`); From a9c77ed217b4f2cb8f5608738dc42f68fedde0f9 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Tue, 11 Jun 2024 13:51:38 +0200 Subject: [PATCH 05/10] fix: catch api errors when using scan iterators --- src/shared/yagna/adapters/market-api-adapter.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index 2b0e01ec5..81c7d1173 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -444,7 +444,8 @@ export class MarketApiAdapter implements IMarketApi { eventSource.close(); void cleanupIterator(); }; - }); + }) + .catch((error) => observer.error(error)); return () => { ac.abort(); }; From 6cf63a0367200a452941700e40f629d939e91e4a Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Thu, 13 Jun 2024 14:06:23 +0200 Subject: [PATCH 06/10] chore: replace `getProvider` method with an actual getter --- README.md | 2 +- examples/advanced/scan.ts | 2 +- src/market/scan/scanned-proposal.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 506673f0b..211eba31c 100644 --- a/README.md +++ b/README.md @@ -368,7 +368,7 @@ await glm.market .pipe(takeUntil(timer(10_000))) .subscribe({ next: (scannedOffer) => { - console.log("Found offer from", scannedOffer.getProviderInfo().name); + console.log("Found offer from", scannedOffer.provider.name); }, complete: () => { console.log("Market scan completed"); diff --git a/examples/advanced/scan.ts b/examples/advanced/scan.ts index f55a01268..f0838b737 100644 --- a/examples/advanced/scan.ts +++ b/examples/advanced/scan.ts @@ -36,7 +36,7 @@ const scanOptions: ScanOptions = { .scan(spec) .pipe( tap((scannedOffer) => { - console.log("Found offer from", scannedOffer.getProviderInfo().name); + console.log("Found offer from", scannedOffer.provider.name); }), // calculate the cost of an hour of work map( diff --git a/src/market/scan/scanned-proposal.ts b/src/market/scan/scanned-proposal.ts index a72b73322..2d6aa2231 100644 --- a/src/market/scan/scanned-proposal.ts +++ b/src/market/scan/scanned-proposal.ts @@ -52,7 +52,7 @@ export class ScannedOffer { }; } - getProviderInfo() { + get provider() { return { id: this.model.providerId, name: this.properties["golem.node.id.name"] || "", From 3a24944193101792a7f5d090c498d85ea84b8c99 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Fri, 14 Jun 2024 13:06:16 +0200 Subject: [PATCH 07/10] fix: improve error handling in market scan --- examples/advanced/scan.ts | 8 ++------ src/market/error.ts | 1 + src/shared/yagna/adapters/market-api-adapter.ts | 7 ++++++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/examples/advanced/scan.ts b/examples/advanced/scan.ts index f0838b737..82786e73d 100644 --- a/examples/advanced/scan.ts +++ b/examples/advanced/scan.ts @@ -22,12 +22,8 @@ const scanOptions: ScanOptions = { await glm.connect(); const spec = glm.market.buildScanSpecification(scanOptions); - /* For advanced users: you can also add properties and constraints manually: - spec.properties.push({ - key: "golem.inf.cpu.architecture", - value: "x86_64", - }); - */ + // For advanced users: you can also add properties and constraints manually: + // spec.constraints.push("(golem.node.id.name=my-favorite-provider)"); const SCAN_DURATION_MS = 10_000; diff --git a/src/market/error.ts b/src/market/error.ts index 1d54f6a43..9317bc988 100644 --- a/src/market/error.ts +++ b/src/market/error.ts @@ -15,6 +15,7 @@ export enum MarketErrorCode { AgreementApprovalFailed = "AgreementApprovalFailed", NoProposalAvailable = "NoProposalAvailable", InternalError = "InternalError", + ScanFailed = "ScanFailed", } export class GolemMarketError extends GolemModuleError { diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index 81c7d1173..e6ccc4d6a 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -445,7 +445,12 @@ export class MarketApiAdapter implements IMarketApi { void cleanupIterator(); }; }) - .catch((error) => observer.error(error)); + .catch((error) => { + const message = getMessageFromApiError(error); + observer.error( + new GolemMarketError(`Error while scanning for offers. ${message}`, MarketErrorCode.ScanFailed, error), + ); + }); return () => { ac.abort(); }; From 0c45febe3479376ed25efb1a0bb682dab91ff625 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Fri, 14 Jun 2024 15:33:01 +0200 Subject: [PATCH 08/10] feat: remove properties from scan spec builder and add payment platform --- examples/advanced/scan.ts | 11 +++- src/market/market.module.ts | 3 +- src/market/scan/scan-director.ts | 58 ++++++------------- src/market/scan/types.ts | 35 ++++++----- .../yagna/adapters/market-api-adapter.ts | 6 +- 5 files changed, 53 insertions(+), 60 deletions(-) diff --git a/examples/advanced/scan.ts b/examples/advanced/scan.ts index 82786e73d..cdcc749a0 100644 --- a/examples/advanced/scan.ts +++ b/examples/advanced/scan.ts @@ -6,15 +6,20 @@ import { last, map, scan, takeUntil, tap, timer } from "rxjs"; // What providers are we looking for? const scanOptions: ScanOptions = { - subnetTag: "public", + // fairly powerful machine but not too powerful workload: { engine: "vm", minCpuCores: 4, - minMemGib: 8, - minCpuThreads: 8, + maxCpuCores: 16, + minMemGib: 4, + maxMemGib: 8, capabilities: ["vpn"], minStorageGib: 16, }, + // let's only look at mainnet providers only + payment: { + network: "polygon", + }, }; (async () => { diff --git a/src/market/market.module.ts b/src/market/market.module.ts index 053050724..8c87768d8 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -86,7 +86,8 @@ export interface MarketModule { /** * Build a ScanSpecification that can be used to scan the market for offers. * The difference between this method and `buildDemandDetails` is that this method does not require an - * allocation, doesn't set payment related properties and doesn't provide any defaults. + * allocation, doesn't inherit payment properties from `GolemNetwork` settings and doesn't provide any defaults. + * If you wish to set the payment platform, you need to specify it in the ScanOptions. */ buildScanSpecification(options: ScanOptions): ScanSpecification; diff --git a/src/market/scan/scan-director.ts b/src/market/scan/scan-director.ts index c2e5a8cd1..56b5dcd89 100644 --- a/src/market/scan/scan-director.ts +++ b/src/market/scan/scan-director.ts @@ -7,29 +7,15 @@ export class ScanDirector { public async apply(builder: DemandBodyBuilder) { this.addWorkloadDecorations(builder); this.addGenericDecorations(builder); - this.addManifestDecorations(builder); this.addPaymentDecorations(builder); } private addPaymentDecorations(builder: DemandBodyBuilder): void { - if (this.options.payment?.debitNotesAcceptanceTimeoutSec) { - builder.addProperty( - "golem.com.payment.debit-notes.accept-timeout?", - this.options.payment?.debitNotesAcceptanceTimeoutSec, - ); - } - if (this.options.payment?.midAgreementDebitNoteIntervalSec) { - builder.addProperty( - "golem.com.scheme.payu.debit-note.interval-sec?", - this.options.payment?.midAgreementDebitNoteIntervalSec, - ); - } - if (this.options.payment?.midAgreementPaymentTimeoutSec) { - builder.addProperty( - "golem.com.scheme.payu.payment-timeout-sec?", - this.options.payment?.midAgreementPaymentTimeoutSec, - ); - } + if (!this.options.payment) return; + const network = this.options.payment.network; + const driver = this.options.payment.driver || "erc20"; + const token = this.options.payment.token || ["mainnet", "polygon"].includes(network) ? "glm" : "tglm"; + builder.addConstraint(`golem.com.payment.platform.${driver}-${network}-${token}.address`, "*"); } private addWorkloadDecorations(builder: DemandBodyBuilder): void { @@ -42,40 +28,32 @@ export class ScanDirector { if (this.options.workload?.minMemGib) { builder.addConstraint("golem.inf.mem.gib", this.options.workload?.minMemGib, ComparisonOperator.GtEq); } + if (this.options.workload?.maxMemGib) { + builder.addConstraint("golem.inf.mem.gib", this.options.workload?.maxMemGib, ComparisonOperator.LtEq); + } if (this.options.workload?.minStorageGib) { builder.addConstraint("golem.inf.storage.gib", this.options.workload?.minStorageGib, ComparisonOperator.GtEq); } + if (this.options.workload?.maxStorageGib) { + builder.addConstraint("golem.inf.storage.gib", this.options.workload?.maxStorageGib, ComparisonOperator.LtEq); + } if (this.options.workload?.minCpuThreads) { builder.addConstraint("golem.inf.cpu.threads", this.options.workload?.minCpuThreads, ComparisonOperator.GtEq); } + if (this.options.workload?.maxCpuThreads) { + builder.addConstraint("golem.inf.cpu.threads", this.options.workload?.maxCpuThreads, ComparisonOperator.LtEq); + } if (this.options.workload?.minCpuCores) { builder.addConstraint("golem.inf.cpu.cores", this.options.workload?.minCpuCores, ComparisonOperator.GtEq); } + if (this.options.workload?.maxCpuCores) { + builder.addConstraint("golem.inf.cpu.cores", this.options.workload?.maxCpuCores, ComparisonOperator.LtEq); + } } private addGenericDecorations(builder: DemandBodyBuilder): void { if (this.options.subnetTag) { - builder - .addProperty("golem.node.debug.subnet", this.options.subnetTag) - .addConstraint("golem.node.debug.subnet", this.options.subnetTag); - } - - if (this.options.expirationSec) { - builder.addProperty("golem.srv.comp.expiration", Date.now() + this.options.expirationSec * 1000); - } - } - - private addManifestDecorations(builder: DemandBodyBuilder): void { - if (!this.options.workload?.manifest) return; - builder.addProperty("golem.srv.comp.payload", this.options.workload?.manifest); - if (this.options.workload?.manifestSig) { - builder.addProperty("golem.srv.comp.payload.sig", this.options.workload?.manifestSig); - } - if (this.options.workload?.manifestSigAlgorithm) { - builder.addProperty("golem.srv.comp.payload.sig.algorithm", this.options.workload?.manifestSigAlgorithm); - } - if (this.options.workload?.manifestCert) { - builder.addProperty("golem.srv.comp.payload.cert", this.options.workload?.manifestCert); + builder.addConstraint("golem.node.debug.subnet", this.options.subnetTag); } } } diff --git a/src/market/scan/types.ts b/src/market/scan/types.ts index fcfd812f2..5141c2e14 100644 --- a/src/market/scan/types.ts +++ b/src/market/scan/types.ts @@ -1,17 +1,26 @@ -import { BuildDemandOptions, DemandProperty } from "../demand"; - -// recursively make all properties optional (but not array members) -type DeepPartial = T extends object - ? T extends Array - ? T - : { - [P in keyof T]?: DeepPartial; - } - : T; - -export type ScanOptions = DeepPartial; +export type ScanOptions = { + workload?: { + engine?: string; + capabilities?: string[]; + minMemGib?: number; + maxMemGib?: number; + minStorageGib?: number; + maxStorageGib?: number; + minCpuThreads?: number; + maxCpuThreads?: number; + minCpuCores?: number; + maxCpuCores?: number; + }; + subnetTag?: string; + payment?: { + network: string; + /** @default erc20 */ + driver?: string; + /** @default "glm" if network is mainnet or polygon, "tglm" otherwise */ + token?: string; + }; +}; export type ScanSpecification = { - properties: DemandProperty[]; constraints: string[]; }; diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index e6ccc4d6a..0e5744891 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -197,15 +197,15 @@ export class MarketApiAdapter implements IMarketApi { } } - private buildDemandRequestBody(decorations: DemandBodyPrototype): DemandRequestBody { + private buildDemandRequestBody(decorations: Partial): DemandRequestBody { let constraints: string; - if (!decorations.constraints.length) constraints = "(&)"; + if (!decorations.constraints?.length) constraints = "(&)"; else if (decorations.constraints.length == 1) constraints = decorations.constraints[0]; else constraints = `(&${decorations.constraints.join("\n\t")})`; const properties: Record = {}; - decorations.properties.forEach((prop) => (properties[prop.key] = prop.value)); + decorations.properties?.forEach((prop) => (properties[prop.key] = prop.value)); return { constraints, properties }; } From dce0848f73017d07632bf95286e00a0e38c139da Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Fri, 14 Jun 2024 15:36:48 +0200 Subject: [PATCH 09/10] chore: typo in scan example --- examples/advanced/scan.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/advanced/scan.ts b/examples/advanced/scan.ts index cdcc749a0..00c1ea9fc 100644 --- a/examples/advanced/scan.ts +++ b/examples/advanced/scan.ts @@ -16,7 +16,7 @@ const scanOptions: ScanOptions = { capabilities: ["vpn"], minStorageGib: 16, }, - // let's only look at mainnet providers only + // let's look at mainnet providers only payment: { network: "polygon", }, From 7389850eacd508dc118a5543e33745e62dd3bfce Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Mon, 1 Jul 2024 13:57:32 +0200 Subject: [PATCH 10/10] refactor: bump ya-ts-client and use auto-generated methods and types --- package-lock.json | 8 ++--- package.json | 2 +- src/market/scan/scanned-proposal.ts | 13 ++------ .../yagna/adapters/market-api-adapter.ts | 30 +++++++++---------- 4 files changed, 22 insertions(+), 31 deletions(-) diff --git a/package-lock.json b/package-lock.json index a9cc46c6e..037bbf19e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -28,7 +28,7 @@ "tmp": "^0.2.2", "uuid": "^10.0.0", "ws": "^8.16.0", - "ya-ts-client": "^1.1.2" + "ya-ts-client": "^1.1.3" }, "devDependencies": { "@commitlint/cli": "^19.0.3", @@ -18980,9 +18980,9 @@ } }, "node_modules/ya-ts-client": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/ya-ts-client/-/ya-ts-client-1.1.2.tgz", - "integrity": "sha512-rZ2YMs3ATOzkaRo7NXDuNZ9xhjxufD/FLFpEm88LqnrpNJcRa81ZXy7B0yKQ2PaOedaKWELPjwcF7RoaOp2N1A==", + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/ya-ts-client/-/ya-ts-client-1.1.3.tgz", + "integrity": "sha512-30l0EAz8E1G0JbhNj8aNQF1/g9SVXYUebdVJ248jQEA8oOwxotcw2STvAZZPX84KhbKRe1W5H+rY5cnwTLzCLQ==", "engines": { "node": ">=18.0.0" } diff --git a/package.json b/package.json index 944d91524..1da82c402 100644 --- a/package.json +++ b/package.json @@ -77,7 +77,7 @@ "tmp": "^0.2.2", "uuid": "^10.0.0", "ws": "^8.16.0", - "ya-ts-client": "^1.1.2" + "ya-ts-client": "^1.1.3" }, "devDependencies": { "@commitlint/cli": "^19.0.3", diff --git a/src/market/scan/scanned-proposal.ts b/src/market/scan/scanned-proposal.ts index 2d6aa2231..c0797b28c 100644 --- a/src/market/scan/scanned-proposal.ts +++ b/src/market/scan/scanned-proposal.ts @@ -1,21 +1,14 @@ import { PricingInfo, ProposalProperties } from "../proposal"; import { GolemInternalError } from "../../shared/error/golem-error"; +import type { MarketApi } from "ya-ts-client"; -// Raw response from yagna -// TODO: add to ya-client -type ScannedOfferDTO = { - properties: ProposalProperties; - constraints: string; - offerId: string; - providerId: string; - timestamp: string; -}; +type ScannedOfferDTO = MarketApi.OfferDTO; export class ScannedOffer { constructor(private readonly model: ScannedOfferDTO) {} get properties(): ProposalProperties { - return this.model.properties; + return this.model.properties as ProposalProperties; } get constraints(): string { diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index f277f1df3..1d9caf8f9 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -403,24 +403,22 @@ export class MarketApiAdapter implements IMarketApi { public scan(spec: ScanSpecification): Observable { const ac = new AbortController(); return new Observable((observer) => { - this.yagnaApi.market.httpRequest - .request({ - body: { - ...this.buildDemandRequestBody(spec), - type: "offer", - }, - method: "POST", - url: "/scan", + this.yagnaApi.market + .beginScan({ + type: "offer", + ...this.buildDemandRequestBody(spec), }) .then((iterator) => { - const cleanupIterator = () => - this.yagnaApi.market.httpRequest.request({ - method: "DELETE", - url: `/scan/${iterator}`, - }); + if (typeof iterator !== "string") { + throw new Error(`Something went wrong while starting the scan, ${iterator.message}`); + } + return iterator; + }) + .then(async (iterator) => { + const cleanupIterator = () => this.yagnaApi.market.endScan(iterator); if (ac.signal.aborted) { - void cleanupIterator(); + await cleanupIterator(); return; } @@ -444,9 +442,9 @@ export class MarketApiAdapter implements IMarketApi { }); eventSource.addEventListener("error", (error) => observer.error(error)); - ac.signal.onabort = () => { + ac.signal.onabort = async () => { eventSource.close(); - void cleanupIterator(); + await cleanupIterator(); }; }) .catch((error) => {