Skip to content

Commit

Permalink
Merge pull request #163 from LibertyAces/feature/declarativeasab
Browse files Browse the repository at this point in the history
Migrate BSPUMP Declarative to ASAB library.
  • Loading branch information
PremyslCerny authored Feb 17, 2023
2 parents 05f68ea + 0fbfdf0 commit aff16e7
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 143 deletions.
8 changes: 0 additions & 8 deletions bspump/declarative/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
from .declerror import DeclarationError
from .segmentbuilder import SegmentBuilder

from .libraries import DeclarationLibrary
from .libraries import FileDeclarationLibrary
from .libraries import ZooKeeperDeclarationLibrary

from .abc import Expression
from .abc import SequenceExpression

Expand All @@ -27,10 +23,6 @@

"ExpressionOptimizer",

"DeclarationLibrary",
"FileDeclarationLibrary",
"ZooKeeperDeclarationLibrary",

"Expression",
"SequenceExpression",

Expand Down
79 changes: 61 additions & 18 deletions bspump/declarative/builder.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
import inspect
import asyncio

import yaml

from .libraries import FileDeclarationLibrary
from .declerror import DeclarationError
from .abc import Expression

Expand All @@ -25,17 +25,20 @@ class ExpressionBuilder(object):
Builds an expression from configuration.
"""

def __init__(self, app, libraries=None):
def __init__(self, app, library=None, include_paths=None):
self.App = app
self.ExpressionClasses = {}
self.Identifier = None

self.Config = {}
self.Library = library

if include_paths is None:
self.IncludePaths = ["/include"]

if libraries is None:
self.Libraries = [FileDeclarationLibrary()]
else:
self.Libraries = libraries
self.IncludePaths = include_paths
assert isinstance(include_paths, list)

# Register the common expression module
from . import expression
Expand All @@ -55,17 +58,22 @@ def add_config_value(self, key, value):
def update_config(self, config):
self.Config.update(config)

def read(self, identifier):
async def read(self, identifier):

if self.Library is None:
raise RuntimeError("Cannot read '{}' in builder, ASAB library is not provided".format(identifier))

# Read declaration from available declarations libraries
for declaration_library in self.Libraries:
declaration = declaration_library.read(identifier)
for include_path in self.IncludePaths:
declaration = await self.Library.read("{}/{}.yaml".format(include_path, identifier))

if declaration is not None:
return declaration
return declaration.read().decode("utf-8")

raise RuntimeError("Cannot find '{}' YAML declaration in libraries".format(identifier))


def parse(self, declaration, source_name=None):
async def parse(self, declaration, source_name=None):
"""
Returns a list of expressions from the loaded declaration.
:param declaration:
Expand All @@ -74,11 +82,12 @@ def parse(self, declaration, source_name=None):
"""

self.Identifier = None

if isinstance(declaration, str) and declaration.startswith('---'):
pass

else:
self.Identifier = declaration
declaration = self.read(self.Identifier)
declaration = await self.read(declaration)

loader = yaml.Loader(declaration)
if source_name is not None:
Expand Down Expand Up @@ -117,10 +126,15 @@ def parse(self, declaration, source_name=None):
# Build syntax trees for each expression
while loader.check_data():
expression = loader.get_data()
expression = await self._load_includes(expression)

# Run initialize for the expression and any instance inside
for parent, key, obj in self._walk(expression):

if isinstance(obj, str) and obj.startswith("<INCLUDE>"):
obj = (await self.parse(obj[9:], "<INCLUDE>"))[0]
setattr(parent, key, obj)

if not isinstance(obj, Expression):
continue

Expand All @@ -146,13 +160,13 @@ def parse(self, declaration, source_name=None):
return expressions


def parse_ext(self, declaration, source_name=None):
async def parse_ext(self, declaration, source_name=None):
'''
Wrap top-level declaration into a function, value etc.
This is likely intermediate (not a final) implementation.
'''
result = []
for expr in self.parse(declaration, source_name=source_name):
for expr in await self.parse(declaration, source_name=source_name):

if isinstance(expr, (VALUE, FUNCTION)):
result.append(expr)
Expand Down Expand Up @@ -192,6 +206,7 @@ def _walk(self, expression):
elif isinstance(expression, dict):

for _key, _expression in expression.items():

for parent, key, obj in self._walk(_expression):
yield (parent, key, obj)

Expand All @@ -208,13 +223,40 @@ def _walk(self, expression):
raise NotImplementedError("Walk not implemented for '{}'.".format(expression))


async def _load_includes(self, expression):

if isinstance(expression, Expression):
return expression

if isinstance(expression, str):

if expression.startswith("<INCLUDE>"):
expression = (await self.parse(expression[9:], "<INCLUDE>"))[0]

return expression

if isinstance(expression, dict):

for _key, _value in expression.items():
expression[_key] = await self._load_includes(_value)

return expression

if isinstance(expression, (list, set)):

for _n, _value in enumerate(expression):
expression[_n] = await self._load_includes(_value)

return expression

return expression


def _construct_include(self, loader: yaml.Loader, node: yaml.Node):
"""Include file referenced at node."""
"""Include will be done later using await."""

identifier = loader.construct_scalar(node)
declaration = self.read(identifier)
# Include can be only one expression
return self.parse(declaration, "<INCLUDE>")[0]
return "<INCLUDE>{}".format(identifier)


def _construct_config(self, loader: yaml.Loader, node: yaml.Node):
Expand All @@ -227,6 +269,7 @@ def _constructor(self, loader, node):
xclass = self.ExpressionClasses[node.tag[1:]]

location = node.start_mark

if self.Identifier is not None:
# https://github.com/yaml/pyyaml/blob/4c2e993321ad29a02a61c4818f3cef9229219003/lib3/yaml/reader.py
location = location.replace("<unicode string>", str(self.Identifier))
Expand Down
106 changes: 0 additions & 106 deletions bspump/declarative/libraries.py

This file was deleted.

14 changes: 9 additions & 5 deletions bspump/declarative/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ def construct(cls, app, pipeline, definition: dict):
declaration = definition.get("declaration")
return cls(app, pipeline, declaration=declaration, id=_id, config=config)

def __init__(self, app, pipeline, declaration, libraries=None, id=None, config=None):
def __init__(self, app, pipeline, declaration, library=None, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
builder = ExpressionBuilder(app, libraries)
optimizer = ExpressionOptimizer(app)
expressions = builder.parse(declaration)
self.Expressions = optimizer.optimize_many(expressions)
self.Declaration = declaration
self.Builder = ExpressionBuilder(app, library)
self.ExpressionOptimizer = ExpressionOptimizer(app)
self.Expressions = None

async def initialize(self):
expressions = await self.Builder.parse(self.Declaration)
self.Expressions = self.ExpressionOptimizer.optimize_many(expressions)

def process(self, context, event):
for expression in self.Expressions:
Expand Down
16 changes: 10 additions & 6 deletions examples/bspump-declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,30 @@ def __init__(self, app, pipeline_id=None):
file = open("./data/declarative-input.yml")
declaration = file.read()

self.DeclarativeProcessor = bspump.declarative.DeclarativeProcessor(app, self, declaration)

self.build(

bspump.random.RandomSource(app, self, choice=[
{"eggs": 2, "potatoes": 12, "carrots": 5, "garbage": "to be removed", "name": "xpotatoes", "meta": "Say,Good,Bye!"},
{"potatoes": 10, "radishes": 5, "meat": 8, "name": "xpotatoes", "meta": "Say,Good,Bye!"},
{"radishes": 20, "carrots": 4, "potatoes": 10, "name": "xpotatoes", "meta": "Say,Good,Bye!"}
], config={"number": 5}).on(bspump.trigger.OpportunisticTrigger(app, chilldown_period=10)),

bspump.declarative.DeclarativeProcessor(app, self, declaration),

self.DeclarativeProcessor,
bspump.common.PPrintSink(app, self)
)


class VegetableCounterApplication(bspump.BSPumpApplication):

def __init__(self):
super().__init__()
async def initialize(self):
svc = self.get_service("bspump.PumpService")
svc.add_pipeline(VegetableCounterPipeline(self))

pipeline = VegetableCounterPipeline(self)
await pipeline.DeclarativeProcessor.initialize()
svc.add_pipeline(pipeline)

pipeline.start()


if __name__ == '__main__':
Expand Down

0 comments on commit aff16e7

Please sign in to comment.