-
Notifications
You must be signed in to change notification settings - Fork 48
/
func.py
74 lines (64 loc) · 2.71 KB
/
func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#
# Copyright (c) 2021 Oracle, Inc. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
#
import os
import io
import json
import logging
import base64
from fdk import response
from borneo import (
AuthorizationProvider, DeleteRequest, GetRequest,
IllegalArgumentException, NoSQLHandle, NoSQLHandleConfig, PutRequest,
QueryRequest, Regions, TableLimits, TableRequest)
from borneo.iam import SignatureProvider
from borneo.kv import StoreAccessTokenProvider
import traceback
def handler(ctx, data: io.BytesIO=None):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
try:
store_handle = get_handle()
logs = json.loads(data.getvalue())
logger.info('Received {} entries.'.format(len(logs)))
for item in logs:
if 'value' in item:
item['value'] = base64_decode(item['value'])
if 'key' in item:
item['key'] = base64_decode(item['key'])
#
# For demo purpose, we are inserting the data as received, no processing
# Put rows in NoSQL
#
request = PutRequest().set_table_name('demoKeyVal')
for item in logs:
if 'value' in item:
value = { 'value': json.loads(item['value'])}
request.set_value(value)
store_handle.put(request)
request = PutRequest().set_table_name('demo')
for item in logs:
if 'value' in item:
request.set_value_from_json(item['value'])
store_handle.put(request)
#
# return data
#
return response.Response(ctx, status_code=200, response_data=logs, headers={"Content-Type": "text/plain"})
except (Exception, ValueError) as e:
# For demo purpose, I am ignoring all errors
# During tests, I sent some no JSON messages to my Stream that were broken all
# If there is an error, in the next iteration I will receive all pending message from the Stream
# so continue to have the same error
logger.error('Logging and ignore the error ' + str(e))
return response.Response(ctx, status_code=200, response_data=traceback.format_exc(), headers={"Content-Type": "text/plain"})
def base64_decode(encoded):
if not encoded: return
base64_bytes = encoded.encode('utf-8')
message_bytes = base64.b64decode(base64_bytes)
return message_bytes.decode('utf-8')
def get_handle():
provider = SignatureProvider.create_with_resource_principal()
config = NoSQLHandleConfig(os.getenv('NOSQL_REGION'), provider).set_logger(None).set_default_compartment(os.getenv('NOSQL_COMPARTMENT_ID'))
return NoSQLHandle(config)