diff --git a/ci/nightly/pipeline.yml b/ci/nightly/pipeline.yml index f4af1ef83c0ec..c90e2178b87df 100644 --- a/ci/nightly/pipeline.yml +++ b/ci/nightly/pipeline.yml @@ -378,7 +378,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=RestartPostgresBackend, --execution-mode=oneatatime] + args: [--scenario=RestartCockroach, --execution-mode=oneatatime] - id: checks-oneatatime-restart-redpanda-debezium label: "Checks oneatatime + restart Redpanda & Debezium" @@ -448,7 +448,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=RestartPostgresBackend, --execution-mode=parallel] + args: [--scenario=RestartCockroach, --execution-mode=parallel] - id: checks-parallel-restart-redpanda label: "Checks parallel + restart Redpanda & Debezium" diff --git a/ci/test/pipeline.template.yml b/ci/test/pipeline.template.yml index 12b8898e27313..666063400eea1 100644 --- a/ci/test/pipeline.template.yml +++ b/ci/test/pipeline.template.yml @@ -508,8 +508,8 @@ steps: composition: platform-checks args: [--scenario=KillClusterdStorage] - - id: checks-restart-postgres-backend - label: "Checks + restart Postgres backend" + - id: checks-restart-cockroach + label: "Checks + restart Cockroach" depends_on: build-x86_64 inputs: [misc/python/materialize/checks] timeout_in_minutes: 30 @@ -518,7 +518,7 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=RestartPostgresBackend] + args: [--scenario=RestartCockroach] - id: checks-restart-source-postgres label: "Checks + restart source Postgres" diff --git a/misc/cockroach/setup_materialize.sql b/misc/cockroach/setup_materialize.sql new file mode 100644 index 0000000000000..d88bb2f04cd4b --- /dev/null +++ b/misc/cockroach/setup_materialize.sql @@ -0,0 +1,20 @@ +-- Copyright Materialize, Inc. and contributors. All rights reserved. +-- +-- Use of this software is governed by the Business Source License +-- included in the LICENSE file at the root of this repository. +-- +-- As of the Change Date specified in that file, in accordance with +-- the Business Source License, use of this software will be governed +-- by the Apache License, Version 2.0. + +-- Sets up a CockroachDB cluster for use by Materialize. + +-- See: https://github.com/cockroachdb/cockroach/issues/93892 +-- See: https://github.com/MaterializeInc/materialize/issues/16726 +-- TODO: remove this workaround before upgrading to CockroachDB 22.2 in +-- production. +SET CLUSTER SETTING sql.stats.forecasts.enabled = false; + +CREATE SCHEMA consensus; +CREATE SCHEMA adapter; +CREATE SCHEMA storage; diff --git a/misc/dbt-materialize/mzcompose.py b/misc/dbt-materialize/mzcompose.py index 28f71059315f4..a060cf19ce459 100644 --- a/misc/dbt-materialize/mzcompose.py +++ b/misc/dbt-materialize/mzcompose.py @@ -11,17 +11,15 @@ from typing import Dict, List, Optional from materialize.mzcompose import Composition, WorkflowArgumentParser -from materialize.mzcompose.services import Materialized, Redpanda, Service, TestCerts +from materialize.mzcompose.services import Materialized, Redpanda, Service SERVICES = [ - TestCerts(), Materialized(), Redpanda(), Service( "dbt-test", { "mzbuild": "dbt-materialize", - "depends_on": ["test-certs"], "environment": [ "TMPDIR=/share/tmp", ], @@ -63,7 +61,6 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: materialized = Materialized( options=test_case.materialized_options, image=test_case.materialized_image, - depends_on=["test-certs"], volumes_extra=["secrets:/secrets"], ) diff --git a/misc/python/materialize/checks/debezium.py b/misc/python/materialize/checks/debezium.py index 478a1dfc5445d..065e568817aa2 100644 --- a/misc/python/materialize/checks/debezium.py +++ b/misc/python/materialize/checks/debezium.py @@ -19,7 +19,7 @@ def initialize(self) -> Testdrive: return Testdrive( dedent( """ - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE TABLE debezium_table (f1 TEXT, f2 INTEGER, f3 INTEGER, f4 TEXT, PRIMARY KEY (f1, f2)); ALTER TABLE debezium_table REPLICA IDENTITY FULL; INSERT INTO debezium_table SELECT 'A', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000); @@ -29,7 +29,7 @@ def initialize(self) -> Testdrive: "name": "psql-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "database.hostname": "postgres-source", + "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", @@ -59,7 +59,7 @@ def initialize(self) -> Testdrive: FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE DEBEZIUM; - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO debezium_table SELECT 'B', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000); > CREATE MATERIALIZED VIEW debezium_view1 AS SELECT f1, f3, SUM(LENGTH(f4)) FROM debezium_source1 GROUP BY f1, f3; @@ -76,7 +76,7 @@ def manipulate(self) -> List[Testdrive]: Testdrive(dedent(s)) for s in [ """ - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres BEGIN; INSERT INTO debezium_table SELECT 'C', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000); UPDATE debezium_table SET f3 = f3 + 1; @@ -87,7 +87,7 @@ def manipulate(self) -> List[Testdrive]: FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE DEBEZIUM; - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres BEGIN; INSERT INTO debezium_table SELECT 'D', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000); UPDATE debezium_table SET f3 = f3 + 1; @@ -96,7 +96,7 @@ def manipulate(self) -> List[Testdrive]: > CREATE MATERIALIZED VIEW debezium_view2 AS SELECT f1, f3, SUM(LENGTH(f4)) FROM debezium_source2 GROUP BY f1, f3; """, """ - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres BEGIN; INSERT INTO debezium_table SELECT 'E', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000); UPDATE debezium_table SET f3 = f3 + 1; @@ -107,7 +107,7 @@ def manipulate(self) -> List[Testdrive]: FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE DEBEZIUM; - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres BEGIN; INSERT INTO debezium_table SELECT 'F', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000); UPDATE debezium_table SET f3 = f3 + 1; diff --git a/misc/python/materialize/checks/mzcompose_actions.py b/misc/python/materialize/checks/mzcompose_actions.py index 0e1ba8d6a5885..4e911ac522bce 100644 --- a/misc/python/materialize/checks/mzcompose_actions.py +++ b/misc/python/materialize/checks/mzcompose_actions.py @@ -27,12 +27,6 @@ class MzcomposeAction(Action): class StartMz(MzcomposeAction): - DEFAULT_MZ_OPTIONS = [ - "--persist-consensus-url=postgresql://postgres:postgres@postgres-backend:5432?options=--search_path=consensus", - "--storage-stash-url=postgresql://postgres:postgres@postgres-backend:5432?options=--search_path=storage", - "--adapter-stash-url=postgresql://postgres:postgres@postgres-backend:5432?options=--search_path=adapter", - ] - def __init__( self, tag: Optional[str] = None, environment_extra: List[str] = [] ) -> None: @@ -46,7 +40,7 @@ def execute(self, e: Executor) -> None: print(f"Starting Mz using image {image}") mz = Materialized( image=image, - options=StartMz.DEFAULT_MZ_OPTIONS, + external_cockroach=True, environment_extra=self.environment_extra, ) @@ -134,22 +128,21 @@ def execute(self, e: Executor) -> None: c.start_and_wait_for_tcp(services=[service]) -class RestartPostgresBackend(MzcomposeAction): +class RestartCockroach(MzcomposeAction): def execute(self, e: Executor) -> None: c = e.mzcompose_composition() - c.kill("postgres-backend") - c.up("postgres-backend") - c.wait_for_postgres(service="postgres-backend") + c.kill("cockroach") + c.up("cockroach") class RestartSourcePostgres(MzcomposeAction): def execute(self, e: Executor) -> None: c = e.mzcompose_composition() - c.kill("postgres-source") - c.up("postgres-source") - c.wait_for_postgres(service="postgres-source") + c.kill("postgres") + c.up("postgres") + c.wait_for_postgres(service="postgres") class KillClusterdStorage(MzcomposeAction): diff --git a/misc/python/materialize/checks/pg_cdc.py b/misc/python/materialize/checks/pg_cdc.py index 64a06a3d316ee..e4a83c2ebab55 100644 --- a/misc/python/materialize/checks/pg_cdc.py +++ b/misc/python/materialize/checks/pg_cdc.py @@ -22,12 +22,12 @@ def initialize(self) -> Testdrive: > CREATE SECRET pgpass1 AS 'postgres'; > CREATE CONNECTION pg1 FOR POSTGRES - HOST 'postgres-source', + HOST 'postgres', DATABASE postgres, USER postgres1, PASSWORD SECRET pgpass1 - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE USER postgres1 WITH SUPERUSER PASSWORD 'postgres'; ALTER USER postgres1 WITH replication; DROP PUBLICATION IF EXISTS postgres_source; @@ -54,25 +54,25 @@ def manipulate(self) -> List[Testdrive]: (PUBLICATION 'postgres_source') FOR TABLES (postgres_source_table AS postgres_source_tableA); - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table SELECT 'B', 1, REPEAT('X', 1024) FROM generate_series(1,100); UPDATE postgres_source_table SET f2 = f2 + 1; > CREATE SECRET pgpass2 AS 'postgres'; > CREATE CONNECTION pg2 FOR POSTGRES - HOST 'postgres-source', + HOST 'postgres', DATABASE postgres, USER postgres1, PASSWORD SECRET pgpass1 - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table SELECT 'C', 1, REPEAT('X', 1024) FROM generate_series(1,100); UPDATE postgres_source_table SET f2 = f2 + 1; """, """ - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table SELECT 'D', 1, REPEAT('X', 1024) FROM generate_series(1,100); UPDATE postgres_source_table SET f2 = f2 + 1; @@ -81,18 +81,18 @@ def manipulate(self) -> List[Testdrive]: (PUBLICATION 'postgres_source') FOR TABLES (postgres_source_table AS postgres_source_tableB); - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table SELECT 'E', 1, REPEAT('X', 1024) FROM generate_series(1,100); UPDATE postgres_source_table SET f2 = f2 + 1; - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table SELECT 'F', 1, REPEAT('X', 1024) FROM generate_series(1,100); UPDATE postgres_source_table SET f2 = f2 + 1; > CREATE SECRET pgpass3 AS 'postgres'; > CREATE CONNECTION pg3 FOR POSTGRES - HOST 'postgres-source', + HOST 'postgres', DATABASE postgres, USER postgres1, PASSWORD SECRET pgpass3 @@ -102,12 +102,12 @@ def manipulate(self) -> List[Testdrive]: (PUBLICATION 'postgres_source') FOR TABLES (postgres_source_table AS postgres_source_tableC); - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table SELECT 'G', 1, REPEAT('X', 1024) FROM generate_series(1,100); UPDATE postgres_source_table SET f2 = f2 + 1; - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table SELECT 'H', 1, REPEAT('X', 1024) FROM generate_series(1,100); UPDATE postgres_source_table SET f2 = f2 + 1; """, @@ -160,12 +160,12 @@ def initialize(self) -> Testdrive: > CREATE SECRET postgres_mz_now_pass AS 'postgres'; > CREATE CONNECTION postgres_mz_now_conn FOR POSTGRES - HOST 'postgres-source', + HOST 'postgres', DATABASE postgres, USER postgres2, PASSWORD SECRET postgres_mz_now_pass - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE USER postgres2 WITH SUPERUSER PASSWORD 'postgres'; ALTER USER postgres2 WITH replication; DROP PUBLICATION IF EXISTS postgres_mz_now_publication; @@ -201,7 +201,7 @@ def manipulate(self) -> List[Testdrive]: Testdrive(dedent(s)) for s in [ """ - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C2'); @@ -211,7 +211,7 @@ def manipulate(self) -> List[Testdrive]: UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'C1'; """, """ - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C3'); @@ -230,7 +230,7 @@ def validate(self) -> Testdrive: > SELECT COUNT(*) FROM postgres_mz_now_table; 13 - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C4'); @@ -250,7 +250,7 @@ def validate(self) -> Testdrive: 0 # Rollback the last INSERTs so that validate() can be called multiple times - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3'); DELETE FROM postgres_mz_now_table WHERE f2 LIKE '%4%'; """ diff --git a/misc/python/materialize/checks/scenarios.py b/misc/python/materialize/checks/scenarios.py index 9b71017eed2a3..248222b58cdf5 100644 --- a/misc/python/materialize/checks/scenarios.py +++ b/misc/python/materialize/checks/scenarios.py @@ -29,7 +29,7 @@ ) from materialize.checks.mzcompose_actions import KillMz from materialize.checks.mzcompose_actions import ( - RestartPostgresBackend as RestartPostgresBackendAction, + RestartCockroach as RestartCockroachAction, ) from materialize.checks.mzcompose_actions import ( RestartRedpandaDebezium as RestartRedpandaDebeziumAction, @@ -155,16 +155,16 @@ def actions(self) -> List[Action]: ] -class RestartPostgresBackend(Scenario): +class RestartCockroach(Scenario): def actions(self) -> List[Action]: return [ StartMz(), Initialize(self.checks), - RestartPostgresBackendAction(), + RestartCockroachAction(), Manipulate(self.checks, phase=1), - RestartPostgresBackendAction(), + RestartCockroachAction(), Manipulate(self.checks, phase=2), - RestartPostgresBackendAction(), + RestartCockroachAction(), Validate(self.checks), ] diff --git a/misc/python/materialize/checks/source_errors.py b/misc/python/materialize/checks/source_errors.py index c23f8f4d15820..86db6953fbff1 100644 --- a/misc/python/materialize/checks/source_errors.py +++ b/misc/python/materialize/checks/source_errors.py @@ -22,12 +22,12 @@ def initialize(self) -> Testdrive: > CREATE SECRET source_errors_secret AS 'postgres'; > CREATE CONNECTION source_errors_connection FOR POSTGRES - HOST 'postgres-source', + HOST 'postgres', DATABASE postgres, USER source_errors_user1, PASSWORD SECRET source_errors_secret - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres # In order to avoid conflicts, user must be unique CREATE USER source_errors_user1 WITH SUPERUSER PASSWORD 'postgres'; ALTER USER source_errors_user1 WITH replication; @@ -54,7 +54,7 @@ def initialize(self) -> Testdrive: (PUBLICATION 'source_errors_publicationb') /* all lowercase */ FOR TABLES (source_errors_table AS source_errors_tableB) - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO source_errors_table VALUES (2); > SELECT COUNT(*) FROM source_errors_tableA; @@ -72,12 +72,12 @@ def manipulate(self) -> List[Testdrive]: Testdrive(dedent(s)) for s in [ """ - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres DROP PUBLICATION source_errors_publicationA; INSERT INTO source_errors_table VALUES (3); """, """ - $ postgres-execute connection=postgres://postgres:postgres@postgres-source + $ postgres-execute connection=postgres://postgres:postgres@postgres DROP PUBLICATION source_errors_publicationB; INSERT INTO source_errors_table VALUES (4); """, diff --git a/misc/python/materialize/cloudtest/k8s/cockroach.py b/misc/python/materialize/cloudtest/k8s/cockroach.py index 84159a35e63d3..b0b4be4e7dc14 100644 --- a/misc/python/materialize/cloudtest/k8s/cockroach.py +++ b/misc/python/materialize/cloudtest/k8s/cockroach.py @@ -7,8 +7,6 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. -from textwrap import dedent - from kubernetes.client import ( V1ConfigMap, V1ConfigMapVolumeSource, @@ -30,6 +28,7 @@ V1VolumeMount, ) +from materialize import ROOT from materialize.cloudtest.k8s import K8sConfigMap, K8sService, K8sStatefulSet @@ -40,13 +39,9 @@ def __init__(self) -> None: name="cockroach-init", ), data={ - "schemas.sql": dedent( - """ - CREATE SCHEMA consensus; - CREATE SCHEMA catalog; - CREATE SCHEMA storage; - """ - ), + "setup_materialize.sql": ( + ROOT / "misc" / "cockroach" / "setup_materialize.sql" + ).read_text(), }, ) diff --git a/misc/python/materialize/cloudtest/k8s/environmentd.py b/misc/python/materialize/cloudtest/k8s/environmentd.py index 1287e548e2500..8f190b6fa686b 100644 --- a/misc/python/materialize/cloudtest/k8s/environmentd.py +++ b/misc/python/materialize/cloudtest/k8s/environmentd.py @@ -109,7 +109,7 @@ def generate_stateful_set(self) -> V1StatefulSet: "--orchestrator=kubernetes", "--orchestrator-kubernetes-image-pull-policy=if-not-present", "--persist-consensus-url=postgres://root@cockroach.default:26257?options=--search_path=consensus", - "--adapter-stash-url=postgres://root@cockroach.default:26257?options=--search_path=catalog", + "--adapter-stash-url=postgres://root@cockroach.default:26257?options=--search_path=adapter", "--storage-stash-url=postgres://root@cockroach.default:26257?options=--search_path=storage", "--internal-sql-listen-addr=0.0.0.0:6877", "--unsafe-mode", diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index a6672ae8d6283..3d70caa6eebe6 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -54,6 +54,7 @@ from pg8000 import Cursor from materialize import mzbuild, spawn, ui +from materialize.mzcompose import loader from materialize.ui import UIError T = TypeVar("T") @@ -108,7 +109,9 @@ def __init__( assert spec module = importlib.util.module_from_spec(spec) assert isinstance(spec.loader, importlib.abc.Loader) + loader.composition_path = self.path spec.loader.exec_module(module) + loader.composition_path = None self.description = inspect.getdoc(module) for name, fn in getmembers(module, isfunction): if name.startswith("workflow_"): @@ -289,14 +292,18 @@ def workflow(self, name: str, *args: str) -> None: ui.header(f"Running workflow {name}") func = self.workflows[name] parser = WorkflowArgumentParser(name, inspect.getdoc(func), list(args)) - if len(inspect.signature(func).parameters) > 1: - func(self, parser) - else: - # If the workflow doesn't have an `args` parameter, parse them here - # with an empty parser to reject bogus arguments and to handle the - # trivial help message. - parser.parse_args() - func(self) + try: + loader.composition_path = self.path + if len(inspect.signature(func).parameters) > 1: + func(self, parser) + else: + # If the workflow doesn't have an `args` parameter, parse them here + # with an empty parser to reject bogus arguments and to handle the + # trivial help message. + parser.parse_args() + func(self) + finally: + loader.composition_path = None @contextmanager def override(self, *services: "Service") -> Iterator[None]: @@ -533,7 +540,13 @@ def pull_if_variable(self, services: List[str]) -> None: ): self.invoke("pull", service) - def up(self, *services: str, detach: bool = True, persistent: bool = False) -> None: + def up( + self, + *services: str, + detach: bool = True, + wait: bool = True, + persistent: bool = False, + ) -> None: """Build, (re)create, and start the named services. Delegates to `docker compose up`. See that command's help for details. @@ -541,6 +554,8 @@ def up(self, *services: str, detach: bool = True, persistent: bool = False) -> N Args: services: The names of services in the composition. detach: Run containers in the background. + wait: Wait for health checks to complete before returning. + Implies `detach` mode. persistent: Replace the container's entrypoint and command with `sleep infinity` so that additional commands can be scheduled on the container with `Composition.exec`. @@ -552,7 +567,12 @@ def up(self, *services: str, detach: bool = True, persistent: bool = False) -> N service["command"] = [] self._write_compose() - self.invoke("up", *(["--detach"] if detach else []), *services) + self.invoke( + "up", + *(["--detach"] if detach else []), + *(["--wait"] if wait else []), + *services, + ) if persistent: self.compose = old_compose @@ -789,6 +809,37 @@ def testdrive( self.run(service, *args, stdin=input) +class ServiceHealthcheck(TypedDict, total=False): + """Configuration for a check to determine whether the containers for this + service are healthy.""" + + test: Union[List[str], str] + """A specification of a command to run.""" + + interval: str + """The interval at which to run the healthcheck.""" + + timeout: str + """The maximum amount of time that the test command can run before it + is considered failed.""" + + retries: int + """The number of consecutive healthchecks that must fail for the container + to be considered unhealthy.""" + + start_period: str + """The period after container start during which failing healthchecks will + not be counted towards the retry limit.""" + + +class ServiceDependency(TypedDict, total=False): + """Configuration for a check to determine whether the containers for this + service are healthy.""" + + condition: str + """Condition under which a dependency is considered satisfied.""" + + class ServiceConfig(TypedDict, total=False): """The definition of a service in Docker Compose. @@ -862,7 +913,7 @@ class ServiceConfig(TypedDict, total=False): TODO(benesch): this should accept a `Dict[str, str]` instead. """ - depends_on: List[str] + depends_on: Union[List[str], Dict[str, ServiceDependency]] """The list of other services that must be started before this one.""" tmpfs: List[str] @@ -889,6 +940,10 @@ class ServiceConfig(TypedDict, total=False): working_dir: str """Overrides the container's working directory.""" + healthcheck: ServiceHealthcheck + """Configuration for a check to determine whether the containers for this + service are healthy.""" + class Service: """A Docker Compose service in a `Composition`. diff --git a/misc/python/materialize/mzcompose/loader.py b/misc/python/materialize/mzcompose/loader.py new file mode 100644 index 0000000000000..14d6d84e87150 --- /dev/null +++ b/misc/python/materialize/mzcompose/loader.py @@ -0,0 +1,14 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from pathlib import Path +from typing import Optional + +# The path of the composition that is currently being loaded. +composition_path: Optional[Path] = None diff --git a/misc/python/materialize/mzcompose/services.py b/misc/python/materialize/mzcompose/services.py index 6f19095453dee..e14b753efe27b 100644 --- a/misc/python/materialize/mzcompose/services.py +++ b/misc/python/materialize/mzcompose/services.py @@ -11,7 +11,8 @@ import random from typing import Dict, List, Optional, Tuple, Union -from materialize.mzcompose import Service, ServiceConfig +from materialize import ROOT +from materialize.mzcompose import Service, ServiceConfig, ServiceDependency, loader DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.0.5" @@ -43,14 +44,20 @@ def __init__( image: Optional[str] = None, environment_extra: List[str] = [], volumes_extra: List[str] = [], - depends_on: Optional[List[str]] = None, + depends_on: List[str] = [], memory: Optional[str] = None, options: List[str] = [], persist_blob_url: Optional[str] = None, default_size: int = Size.DEFAULT_SIZE, environment_id: Optional[str] = None, propagate_crashes: bool = True, + external_cockroach: bool = False, + external_minio: bool = False, ) -> None: + depends_on: Dict[str, ServiceDependency] = { + s: {"condition": "service_started"} for s in depends_on + } + environment = [ "MZ_SOFT_ASSERTIONS=1", # TODO(benesch): remove the following environment variables @@ -84,6 +91,10 @@ def __init__( environment_id = DEFAULT_MZ_ENVIRONMENT_ID command += [f"--environment-id={environment_id}"] + if external_minio: + depends_on["minio"] = {"condition": "service_healthy"} + persist_blob_url = "s3://minioadmin:minioadmin@persist/persist?endpoint=http://minio:9000/®ion=minio" + if persist_blob_url: command.append(f"--persist-blob-url={persist_blob_url}") @@ -102,6 +113,14 @@ def __init__( f"--default-storage-host-size={self.default_storage_size}", ] + if external_cockroach: + depends_on["cockroach"] = {"condition": "service_healthy"} + command += [ + "--adapter-stash-url=postgres://root@cockroach:26257?options=--search_path=adapter", + "--storage-stash-url=postgres://root@cockroach:26257?options=--search_path=storage", + "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus", + ] + command += options config: ServiceConfig = {} @@ -119,7 +138,7 @@ def __init__( config.update( { - "depends_on": depends_on or [], + "depends_on": depends_on, "command": command, "ports": [6875, 6876, 6877, 6878, 26257], "environment": environment, @@ -372,14 +391,29 @@ class Cockroach(Service): def __init__( self, name: str = "cockroach", + setup_materialize: bool = False, ): + volumes = [] + if setup_materialize: + path = os.path.relpath( + ROOT / "misc" / "cockroach" / "setup_materialize.sql", + loader.composition_path, + ) + volumes += [f"{path}:/docker-entrypoint-initdb.d/setup_materialize.sql"] super().__init__( - name="cockroach", + name=name, config={ "image": "cockroachdb/cockroach:v22.2.0", "ports": [26257], "command": ["start-single-node", "--insecure"], - "volumes": ["/cockroach/cockroach-data"], + "volumes": volumes, + "init": True, + "healthcheck": { + "test": "[ -f init_success ] && curl --fail http://localhost:8080/health", + "timeout": "5s", + "interval": "1s", + "start_period": "30s", + }, }, ) @@ -531,29 +565,36 @@ def __init__( self, name: str = "minio", image: str = f"minio/minio:RELEASE.2022-09-25T15-44-53Z.fips", + setup_materialize: bool = False, ) -> None: + # We can pre-create buckets in minio by creating subdirectories in + # /data. A bit gross to do this via a shell command, but it's net + # less complicated than using a separate setup container that runs `mc`. + command = "minio server /data --console-address :9001" + if setup_materialize: + command = f"mkdir -p /data/persist && {command}" super().__init__( name=name, config={ - "command": ["minio", "server", "/data", "--console-address", ":9001"], + "entrypoint": ["sh", "-c"], + "command": [command], "image": image, "ports": [9000, 9001], + "healthcheck": { + "test": [ + "CMD", + "curl", + "--fail", + "http://localhost:9000/minio/health/live", + ], + "timeout": "5s", + "interval": "1s", + "start_period": "30s", + }, }, ) -class MinioMc(Service): - def __init__( - self, - name: str = "minio_mc", - image: str = f"minio/mc:RELEASE.2022-09-16T09-16-47Z", - ) -> None: - super().__init__( - name=name, - config={"image": image}, - ) - - class Testdrive(Service): def __init__( self, @@ -671,6 +712,9 @@ def __init__( super().__init__( name="test-certs", config={ + # Container must stay alive indefinitely to be considered + # healthy by `docker compose up --wait`. + "command": ["sleep", "infinity"], "mzbuild": "test-certs", "volumes": ["secrets:/secrets"], }, diff --git a/misc/python/materialize/zippy/crdb_actions.py b/misc/python/materialize/zippy/crdb_actions.py index 6d4f2c9f3c8cf..7cb9f03bc4210 100644 --- a/misc/python/materialize/zippy/crdb_actions.py +++ b/misc/python/materialize/zippy/crdb_actions.py @@ -19,21 +19,7 @@ class CockroachStart(Action): """Starts a CockroachDB instance.""" def run(self, c: Composition) -> None: - c.start_and_wait_for_tcp(services=["cockroach"]) - c.wait_for_cockroach() - - for schema in ["adapter", "storage", "consensus"]: - c.sql( - f"CREATE SCHEMA IF NOT EXISTS {schema}", - service="cockroach", - user="root", - ) - - c.sql( - "SET CLUSTER SETTING sql.stats.forecasts.enabled = false", - service="cockroach", - user="root", - ) + c.up("cockroach") def provides(self) -> List[Capability]: return [CockroachIsRunning()] diff --git a/misc/python/materialize/zippy/minio_actions.py b/misc/python/materialize/zippy/minio_actions.py index 0873eff3fc5d3..d37c804130132 100644 --- a/misc/python/materialize/zippy/minio_actions.py +++ b/misc/python/materialize/zippy/minio_actions.py @@ -19,26 +19,7 @@ class MinioStart(Action): """Starts a Minio instance.""" def run(self, c: Composition) -> None: - c.start_and_wait_for_tcp(services=["minio"]) - - # Minio is managed using a dedicated container - c.up("minio_mc", persistent=True) - - # Create user - c.exec( - "minio_mc", - "mc", - "config", - "host", - "add", - "myminio", - "http://minio:9000", - "minioadmin", - "minioadmin", - ) - - # Create bucket - c.exec("minio_mc", "mc", "mb", "myminio/persist"), + c.up("minio") def provides(self) -> List[Capability]: return [MinioIsRunning()] diff --git a/test/cluster/mzcompose.py b/test/cluster/mzcompose.py index e837ac31fc246..0dcae0550a91e 100644 --- a/test/cluster/mzcompose.py +++ b/test/cluster/mzcompose.py @@ -547,10 +547,8 @@ def workflow_test_remote_storage(c: Composition) -> None: # Use a separate CockroachDB service for persist rather than the one in # the `Materialized` service, so that crashing `environmentd` does not # also take down CockroachDB. - Cockroach(), - Materialized( - options=["--persist-consensus-url=postgres://root@cockroach:26257"] - ), + Cockroach(setup_materialize=True), + Materialized(external_cockroach=True), ): dependencies = [ "materialized", diff --git a/test/mzcompose_examples/mzcompose.py b/test/mzcompose_examples/mzcompose.py index 3a87619e26095..f77deb43f50c5 100644 --- a/test/mzcompose_examples/mzcompose.py +++ b/test/mzcompose_examples/mzcompose.py @@ -12,7 +12,6 @@ Kafka, Materialized, Minio, - MinioMc, SchemaRegistry, Testdrive, Zookeeper, @@ -32,8 +31,7 @@ ] SERVICES = [ - Minio(), - MinioMc(), + Minio(setup_materialize=True), Zookeeper(), Kafka(), SchemaRegistry(), @@ -80,24 +78,8 @@ def workflow_mz_with_options(c: Composition) -> None: def workflow_minio(c: Composition) -> None: - mz = Materialized( - persist_blob_url="s3://minioadmin:minioadmin@persist/persist?endpoint=http://minio:9000/®ion=minio" - ) + mz = Materialized(external_minio=True) with c.override(mz): - c.start_and_wait_for_tcp(services=["minio"]) - c.up("minio_mc", persistent=True) - c.exec( - "minio_mc", - "mc", - "config", - "host", - "add", - "myminio", - "http://minio:9000", - "minioadmin", - "minioadmin", - ) - c.exec("minio_mc", "mc", "mb", "myminio/persist"), - c.start_and_wait_for_tcp(services=["materialized"]) + c.up("materialized") c.wait_for_materialized() diff --git a/test/platform-checks/mzcompose.py b/test/platform-checks/mzcompose.py index 6bde56b6eca9b..dc066dc4e2460 100644 --- a/test/platform-checks/mzcompose.py +++ b/test/platform-checks/mzcompose.py @@ -18,6 +18,7 @@ from materialize.mzcompose import Composition, WorkflowArgumentParser from materialize.mzcompose.services import ( Clusterd, + Cockroach, Debezium, Materialized, Postgres, @@ -26,14 +27,14 @@ from materialize.mzcompose.services import Testdrive as TestdriveService SERVICES = [ - Postgres(name="postgres-backend"), - Postgres(name="postgres-source"), + Cockroach(setup_materialize=True), + Postgres(), Redpanda(auto_create_topics=True), Debezium(), Clusterd( name="clusterd_compute_1" ), # Started by some Scenarios, defined here only for the teardown - Materialized(), + Materialized(external_cockroach=True), TestdriveService(default_timeout="300s", no_reset=True, seed=1), ] @@ -49,23 +50,10 @@ def __str__(self) -> str: def setup(c: Composition) -> None: c.up("testdrive", persistent=True) + c.up("cockroach") - c.start_and_wait_for_tcp( - services=["redpanda", "postgres-backend", "postgres-source", "debezium"] - ) - for postgres in ["postgres-backend", "postgres-source"]: - c.wait_for_postgres(service=postgres) - - c.sql( - sql=f""" - CREATE SCHEMA IF NOT EXISTS consensus; - CREATE SCHEMA IF NOT EXISTS storage; - CREATE SCHEMA IF NOT EXISTS adapter; - """, - service="postgres-backend", - user="postgres", - password="postgres", - ) + c.start_and_wait_for_tcp(services=["redpanda", "postgres", "debezium"]) + c.wait_for_postgres() def teardown(c: Composition) -> None: diff --git a/test/restart/mzcompose.py b/test/restart/mzcompose.py index 934b176b2d155..4631c488d93bb 100644 --- a/test/restart/mzcompose.py +++ b/test/restart/mzcompose.py @@ -26,7 +26,7 @@ Materialized(), Testdrive(), testdrive_no_reset, - Cockroach(), + Cockroach(setup_materialize=True), ] @@ -96,22 +96,8 @@ def workflow_stash(c: Composition) -> None: ) c.rm_volumes("mzdata", "pgdata", force=True) - materialized = Materialized( - options=[ - "--adapter-stash-url=postgres://root@cockroach:26257?options=--search_path=adapter", - "--storage-stash-url=postgres://root@cockroach:26257?options=--search_path=storage", - "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus", - ], - ) - cockroach = Cockroach() - - with c.override(materialized, cockroach): + with c.override(Materialized(external_cockroach=True)): c.up("cockroach") - c.wait_for_cockroach() - - c.sql("CREATE SCHEMA adapter", service="cockroach", user="root") - c.sql("CREATE SCHEMA storage", service="cockroach", user="root") - c.sql("CREATE SCHEMA consensus", service="cockroach", user="root") c.start_and_wait_for_tcp(services=["materialized"]) c.wait_for_materialized("materialized") @@ -120,13 +106,11 @@ def workflow_stash(c: Composition) -> None: c.stop("cockroach") c.up("cockroach") - c.wait_for_cockroach() c.sql("CREATE TABLE b (i INT)") c.rm("cockroach") c.up("cockroach") - c.wait_for_cockroach() # CockroachDB cleared its database, so this should fail. try: diff --git a/test/zippy/mzcompose.py b/test/zippy/mzcompose.py index 36b653f84634b..7147eee88fd33 100644 --- a/test/zippy/mzcompose.py +++ b/test/zippy/mzcompose.py @@ -18,7 +18,6 @@ Kafka, Materialized, Minio, - MinioMc, Postgres, SchemaRegistry, Testdrive, @@ -33,9 +32,8 @@ SchemaRegistry(), Debezium(), Postgres(), - Cockroach(), - Minio(), - MinioMc(), + Cockroach(setup_materialize=True), + Minio(setup_materialize=True), # Those two are overriden below Materialized(), Clusterd(name="storaged", storage_workers=4), @@ -95,14 +93,6 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: random.seed(args.seed) - environment_extra = ["MZ_LOG_FILTER=warn"] - mz_options = [ - "--adapter-stash-url=postgres://root@cockroach:26257?options=--search_path=adapter", - "--storage-stash-url=postgres://root@cockroach:26257?options=--search_path=storage", - "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus", - ] - persist_blob_url = "s3://minioadmin:minioadmin@persist/persist?endpoint=http://minio:9000/®ion=minio" - with c.override( Testdrive( no_reset=True, @@ -114,16 +104,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: }, ), Materialized( - environment_extra=environment_extra, - options=mz_options, - persist_blob_url=persist_blob_url, - ) - if args.size is None - else Materialized( - default_size=args.size, - environment_extra=environment_extra, - options=mz_options, - persist_blob_url=persist_blob_url, + default_size=args.size or Materialized.Size.DEFAULT_SIZE, + options=["--log-filter=warn"], + external_minio=True, ), ): c.up("testdrive", persistent=True)