-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Regarding the writemultiple support #49
Comments
Hey Joel, am I close at all? I am trying to mimic the import sys
import asyncio
from typing import Callable
from bacpypes3.settings import settings
from bacpypes3.debugging import bacpypes_debugging, ModuleLogger
from bacpypes3.argparse import ArgumentParser
from bacpypes3.console import Console
from bacpypes3.cmd import Cmd
from bacpypes3.pdu import Address, IPv4Address
from bacpypes3.comm import bind
from bacpypes3.apdu import (
ErrorRejectAbortNack,
WritePropertyMultipleRequest,
PropertyValue,
)
from bacpypes3.basetypes import PropertyReference, WriteAccessSpecification
from bacpypes3.primitivedata import ObjectIdentifier, Real, Unsigned
from bacpypes3.ipv4.app import NormalApplication
from bacpypes3.local.device import DeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
app: NormalApplication = None
@bacpypes_debugging
class SampleCmd(Cmd):
"""
Sample Cmd for WritePropertyMultiple
"""
_debug: Callable[..., None]
async def do_wpm(
self,
address: Address,
*args: str,
) -> None:
"""
usage: wpm address ( objid prop value priority )...
"""
if _debug:
SampleCmd._debug("do_wpm %r %r", address, args)
# ensure proper argument structure
if len(args) % 4 != 0:
await self.response("Each property write requires: objid prop value priority")
return
args = list(args)
parameter_list = []
while args:
# extract the object identifier, property identifier, value, and priority
object_identifier = ObjectIdentifier(args.pop(0))
property_identifier = args.pop(0)
value = Real(float(args.pop(0))) # assuming you're writing to Real values
priority = Unsigned(int(args.pop(0)))
# create PropertyReference for the write access
property_reference = PropertyReference(
propertyIdentifier=property_identifier
)
# construct PropertyValue (with priority)
property_value = PropertyValue(
propertyIdentifier=property_identifier,
value=value,
priority=priority,
)
# build the write access specification
write_access_spec = WriteAccessSpecification(
objectIdentifier=object_identifier,
listOfProperties=[property_value], # list of PropertyValue
)
parameter_list.append(write_access_spec)
if not parameter_list:
await self.response("No properties to write")
return
try:
# create WritePropertyMultipleRequest
write_property_multiple_request = WritePropertyMultipleRequest(
listOfWriteAccessSpecs=parameter_list,
destination=address,
)
# send the request
response = await app.request(write_property_multiple_request)
if _debug:
SampleCmd._debug(" - response: %r", response)
if isinstance(response, ErrorRejectAbortNack):
await self.response(f"Error: {response}")
else:
await self.response("Write successful!")
except ErrorRejectAbortNack as err:
if _debug:
SampleCmd._debug(" - exception: %r", err)
await self.response(f"Error: {err}")
async def main() -> None:
global app
try:
console = cmd = app = None
parser = ArgumentParser()
parser.add_argument(
"address",
type=str,
help="target device address",
)
args = parser.parse_args()
if _debug:
_log.debug("args: %r", args)
_log.debug("settings: %r", settings)
# evaluate the address
ipv4_address = IPv4Address(args.address)
if _debug:
_log.debug("ipv4_address: %r", ipv4_address)
# build a very small stack
console = Console()
cmd = SampleCmd()
bind(console, cmd)
# make a device object
local_device = DeviceObject(
objectIdentifier=("device", 1),
objectName="test",
vendorIdentifier=999,
)
if _debug:
_log.debug("local_device: %r", local_device)
# build the application
app = NormalApplication(local_device, ipv4_address)
if _debug:
_log.debug("app: %r", app)
# run until the console is done, canceled or EOF
await console.fini.wait()
except KeyboardInterrupt:
if _debug:
_log.debug("keyboard interrupt")
finally:
if app:
app.close()
if console and console.exit_status:
sys.exit(console.exit_status)
if __name__ == "__main__":
asyncio.run(main())
I get a few errors...Full trace back
|
There isn't a |
@bbartling I put your code in the sandbox and updated it a little (using The code isn't quite up to snuff; the property array index isn't handled correctly, the priority is optional, and there can be more than one |
Hi Sir,
@JoelBender
Does BACpypes3 support the WriteMultiple property? If not, please guide me on how to implement it Sir.
The text was updated successfully, but these errors were encountered: