Skip to content

Commit

Permalink
Update documentation (#628)
Browse files Browse the repository at this point in the history
Co-authored-by: daniil-quix <133032822+daniil-quix@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and daniil-quix authored Nov 15, 2024
1 parent 409acb9 commit fef0428
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 36 deletions.
215 changes: 197 additions & 18 deletions docs/api-reference/quixstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -10533,6 +10533,7 @@ processing all nested files within it.

Expects folder and file structures as generated by the related FileSink connector:

```
my_topics/
├── topic_a/
│ ├── 0/
Expand All @@ -10542,7 +10543,8 @@ my_topics/
│ ├── 0003.ext
│ └── 0016.ext
└── topic_b/
└── etc...
└── etc...
```

Intended to be used with a single topic (ex: topic_a), but will recursively read
from whatever entrypoint is passed to it.
Expand All @@ -10551,22 +10553,23 @@ File format structure depends on the file format.

See the `.formats` and `.compressions` modules to see what is supported.

**Example**:
Example Usage:


from quixstreams import Application
from quixstreams.sources.community.file import FileSource

app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")
source = FileSource(
filepath="/path/to/my/topic_folder",
file_format="json",
file_compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)

if __name__ == "__main__":
app.run()
```python
from quixstreams import Application
from quixstreams.sources.community.file import FileSource

app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")
source = FileSource(
filepath="/path/to/my/topic_folder",
file_format="json",
file_compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)

if __name__ == "__main__":
app.run()
```

<a id="quixstreams.sources.community.file.file.FileSource.__init__"></a>

Expand All @@ -10581,7 +10584,7 @@ def __init__(filepath: Union[str, Path],
shutdown_timeout: float = 10)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L59)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L63)

**Arguments**:

Expand All @@ -10606,7 +10609,7 @@ to gracefully shutdown
def default_topic() -> Topic
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L106)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L110)

Uses the file structure to generate the desired partition count for the

Expand Down Expand Up @@ -10634,6 +10637,182 @@ the original default topic, with updated partition count

This module contains Sources developed and maintained by the members of Quix Streams community.

<a id="quixstreams.sources.community.pubsub"></a>

## quixstreams.sources.community.pubsub

<a id="quixstreams.sources.community.pubsub.consumer"></a>

## quixstreams.sources.community.pubsub.consumer

<a id="quixstreams.sources.community.pubsub.consumer.PubSubSubscriptionNotFound"></a>

### PubSubSubscriptionNotFound

```python
class PubSubSubscriptionNotFound(Exception)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L30)

Raised when an expected subscription does not exist

<a id="quixstreams.sources.community.pubsub.consumer.PubSubConsumer"></a>

### PubSubConsumer

```python
class PubSubConsumer()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L34)

<a id="quixstreams.sources.community.pubsub.consumer.PubSubConsumer.poll_and_process"></a>

#### PubSubConsumer.poll\_and\_process

```python
def poll_and_process(timeout: Optional[float] = None)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L105)

This uses the asynchronous puller to retrieve and handle a message with its
assigned callback.

Committing is a separate step.

<a id="quixstreams.sources.community.pubsub.consumer.PubSubConsumer.poll_and_process_batch"></a>

#### PubSubConsumer.poll\_and\_process\_batch

```python
def poll_and_process_batch()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L122)

Polls and processes until either the max_batch_size or batch_timeout is reached.

<a id="quixstreams.sources.community.pubsub.consumer.PubSubConsumer.subscribe"></a>

#### PubSubConsumer.subscribe

```python
def subscribe()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L134)

Asynchronous subscribers require subscribing (synchronous do not).

NOTE: This will not detect whether the subscription exists.

<a id="quixstreams.sources.community.pubsub.consumer.PubSubConsumer.handle_subscription"></a>

#### PubSubConsumer.handle\_subscription

```python
def handle_subscription() -> Subscription
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L144)

Handles subscription management in one place.

Subscriptions work similarly to Kafka consumer groups.

- Each topic can have multiple subscriptions (consumer group ~= subscription).

- A subscription can have multiple subscribers (similar to consumers in a group).

- NOTE: exactly-once adds message methods (ack_with_response) when enabled.

<a id="quixstreams.sources.community.pubsub.pubsub"></a>

## quixstreams.sources.community.pubsub.pubsub

<a id="quixstreams.sources.community.pubsub.pubsub.PubSubSource"></a>

### PubSubSource

```python
class PubSubSource(Source)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/pubsub.py#L16)

This source enables reading from a Google Cloud Pub/Sub topic,
dumping it to a kafka topic using desired SDF-based transformations.

Provides "at-least-once" guarantees.

Currently, forwarding message keys ("ordered messages" in Pub/Sub) is unsupported.

The incoming message value will be in bytes, so transform in your SDF accordingly.

Example Usage:

```python
from quixstreams import Application
from quixstreams.sources.community.pubsub import PubSubSource
from os import environ

source = PubSubSource(
# Suggested: pass JSON-formatted credentials from an environment variable.
service_account_json = environ["PUBSUB_SERVICE_ACCOUNT_JSON"],
project_id="<project ID>",
topic_id="<topic ID>", # NOTE: NOT the full /x/y/z path!
subscription_id="<subscription ID>", # NOTE: NOT the full /x/y/z path!
create_subscription=True,
)
app = Application(
broker_address="localhost:9092",
auto_offset_reset="earliest",
consumer_group="gcp",
loglevel="INFO"
)
sdf = app.dataframe(source=source).print(metadata=True)

if __name__ == "__main__":
app.run()
```

<a id="quixstreams.sources.community.pubsub.pubsub.PubSubSource.__init__"></a>

#### PubSubSource.\_\_init\_\_

```python
def __init__(project_id: str,
topic_id: str,
subscription_id: str,
service_account_json: Optional[str] = None,
commit_every: int = 100,
commit_interval: float = 5.0,
create_subscription: bool = False,
enable_message_ordering: bool = False,
shutdown_timeout: float = 10.0)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/pubsub.py#L55)

**Arguments**:

- `project_id`: a Google Cloud project ID.
- `topic_id`: a Pub/Sub topic ID (NOT the full path).
- `subscription_id`: a Pub/Sub subscription ID (NOT the full path).
- `service_account_json`: a Google Cloud Credentials JSON as a string
Can instead use environment variables (which have different behavior):
- "GOOGLE_APPLICATION_CREDENTIALS" set to a JSON filepath i.e. /x/y/z.json
- "PUBSUB_EMULATOR_HOST" set to a URL if using an emulated Pub/Sub
- `commit_every`: max records allowed to be processed before committing.
- `commit_interval`: max allowed elapsed time between commits.
- `create_subscription`: whether to attempt to create a subscription at
startup; if it already exists, it instead logs its details (DEBUG level).
- `enable_message_ordering`: When creating a Pub/Sub subscription, whether
to allow message ordering. NOTE: does NOT affect existing subscriptions!
- `shutdown_timeout`: How long to wait for a graceful shutdown of the source.

<a id="quixstreams.sources.base"></a>

## quixstreams.sources.base
Expand Down
Loading

0 comments on commit fef0428

Please sign in to comment.