Skip to content

_stream

_stream

EventStream — main entry point for the event polling system.

EventStream

EventStream(
    api: CocApi,
    queue_size: int = 1000,
    persist_path: Path | str | None = None,
)

Real-time event stream for Clash of Clans resources.

Usage as async generator::

async with CocApi("token") as api:
    stream = EventStream(api)
    stream.watch_clans(["#ABC"], interval=60)

    async with stream:
        async for event in stream:
            print(event.event_type, event.changes)

Usage with callbacks::

@stream.on(EventType.MEMBER_JOINED)
async def on_join(event):
    print(f"{event.metadata['member_name']} joined!")

await stream.run()
PARAMETER DESCRIPTION
api

A CocApi instance in async mode (inside async with).

TYPE: CocApi

queue_size

Max events buffered before backpressure (0 = unlimited).

TYPE: int DEFAULT: 1000

persist_path

Optional path for state persistence across restarts.

TYPE: Path | str | None DEFAULT: None

Initialize the event stream.

PARAMETER DESCRIPTION
api

A CocApi instance in async mode (inside async with).

TYPE: CocApi

queue_size

Max events buffered before backpressure (0 = unlimited).

TYPE: int DEFAULT: 1000

persist_path

Optional path for state persistence across restarts.

TYPE: Path | str | None DEFAULT: None

RAISES DESCRIPTION
RuntimeError

If the CocApi instance is not in async mode.

Source code in cocapi/events/_stream.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(
    self,
    api: CocApi,
    queue_size: int = 1000,
    persist_path: Path | str | None = None,
) -> None:
    """Initialize the event stream.

    Args:
        api: A CocApi instance in async mode (inside ``async with``).
        queue_size: Max events buffered before backpressure (0 = unlimited).
        persist_path: Optional path for state persistence across restarts.

    Raises:
        RuntimeError: If the CocApi instance is not in async mode.
    """
    if not api.async_mode:
        raise RuntimeError(
            "EventStream requires CocApi in async mode. "
            "Use 'async with CocApi(token) as api:'"
        )
    self._api = api
    self._state = PollingState()
    self._queue: asyncio.Queue[Event] = asyncio.Queue(maxsize=queue_size)
    self._watchers: list[
        ClanWatcher | WarWatcher | PlayerWatcher | MaintenanceWatcher
    ] = []
    self._callbacks: dict[EventType | None, list[EventCallback]] = {}
    self._running = False
    self._persist_path = Path(persist_path) if persist_path else None

    if self._persist_path:
        self._state.load(self._persist_path)

watch_clans

watch_clans(
    tags: list[str],
    interval: float = 60.0,
    track_members: bool = True,
) -> EventStream

Register clans for polling.

PARAMETER DESCRIPTION
tags

Clan tags to watch.

TYPE: list[str]

interval

Seconds between polls (default 60).

TYPE: float DEFAULT: 60.0

track_members

Track member joins/leaves/updates (default True).

TYPE: bool DEFAULT: True

Source code in cocapi/events/_stream.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def watch_clans(
    self,
    tags: list[str],
    interval: float = 60.0,
    track_members: bool = True,
) -> EventStream:
    """Register clans for polling.

    Args:
        tags: Clan tags to watch.
        interval: Seconds between polls (default 60).
        track_members: Track member joins/leaves/updates (default True).
    """
    watcher = ClanWatcher(
        api=self._api,
        state=self._state,
        queue=self._queue,
        clan_tags=tags,
        interval=interval,
        track_members=track_members,
    )
    self._watchers.append(watcher)
    return self

watch_wars

watch_wars(
    tags: list[str], interval: float = 30.0
) -> EventStream

Register clans for war state polling.

PARAMETER DESCRIPTION
tags

Clan tags to watch for war updates.

TYPE: list[str]

interval

Seconds between polls (default 30).

TYPE: float DEFAULT: 30.0

Source code in cocapi/events/_stream.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def watch_wars(
    self,
    tags: list[str],
    interval: float = 30.0,
) -> EventStream:
    """Register clans for war state polling.

    Args:
        tags: Clan tags to watch for war updates.
        interval: Seconds between polls (default 30).
    """
    watcher = WarWatcher(
        api=self._api,
        state=self._state,
        queue=self._queue,
        clan_tags=tags,
        interval=interval,
    )
    self._watchers.append(watcher)
    return self

watch_players

watch_players(
    tags: list[str],
    interval: float = 120.0,
    include_fields: frozenset[str] | None = None,
    exclude_fields: frozenset[str] | None = None,
) -> EventStream

Register players for polling.

PARAMETER DESCRIPTION
tags

Player tags to watch.

TYPE: list[str]

interval

Seconds between polls (default 120).

TYPE: float DEFAULT: 120.0

include_fields

Only report changes to these fields.

TYPE: frozenset[str] | None DEFAULT: None

exclude_fields

Ignore changes to these fields.

TYPE: frozenset[str] | None DEFAULT: None

Source code in cocapi/events/_stream.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def watch_players(
    self,
    tags: list[str],
    interval: float = 120.0,
    include_fields: frozenset[str] | None = None,
    exclude_fields: frozenset[str] | None = None,
) -> EventStream:
    """Register players for polling.

    Args:
        tags: Player tags to watch.
        interval: Seconds between polls (default 120).
        include_fields: Only report changes to these fields.
        exclude_fields: Ignore changes to these fields.
    """
    watcher = PlayerWatcher(
        api=self._api,
        state=self._state,
        queue=self._queue,
        player_tags=tags,
        interval=interval,
        include_fields=include_fields,
        exclude_fields=exclude_fields,
    )
    self._watchers.append(watcher)
    return self

watch_maintenance

watch_maintenance(
    interval: float = 30.0, probe_tag: str = "#JY9J2Y99"
) -> EventStream

Enable API maintenance detection.

Polls a known player endpoint to detect 503 maintenance responses. Emits MAINTENANCE_START and MAINTENANCE_END events.

PARAMETER DESCRIPTION
interval

Seconds between probes (default 30).

TYPE: float DEFAULT: 30.0

probe_tag

Player tag to probe (default #JY9J2Y99).

TYPE: str DEFAULT: '#JY9J2Y99'

Source code in cocapi/events/_stream.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def watch_maintenance(
    self,
    interval: float = 30.0,
    probe_tag: str = "#JY9J2Y99",
) -> EventStream:
    """Enable API maintenance detection.

    Polls a known player endpoint to detect 503 maintenance responses.
    Emits ``MAINTENANCE_START`` and ``MAINTENANCE_END`` events.

    Args:
        interval: Seconds between probes (default 30).
        probe_tag: Player tag to probe (default ``#JY9J2Y99``).
    """
    watcher = MaintenanceWatcher(
        api=self._api,
        state=self._state,
        queue=self._queue,
        interval=interval,
        probe_tag=probe_tag,
    )
    self._watchers.append(watcher)
    return self

on

on(
    event_type: EventType | None = None,
) -> Callable[[EventCallback], EventCallback]

Decorator to register an event callback.

PARAMETER DESCRIPTION
event_type

Filter for specific event type. None matches all.

TYPE: EventType | None DEFAULT: None

Source code in cocapi/events/_stream.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def on(
    self,
    event_type: EventType | None = None,
) -> Callable[[EventCallback], EventCallback]:
    """Decorator to register an event callback.

    Args:
        event_type: Filter for specific event type. None matches all.
    """

    def decorator(func: EventCallback) -> EventCallback:
        self._callbacks.setdefault(event_type, []).append(func)
        return func

    return decorator

add_callback

add_callback(
    callback: EventCallback,
    event_type: EventType | None = None,
) -> None

Register an event callback programmatically.

Source code in cocapi/events/_stream.py
200
201
202
203
204
205
206
def add_callback(
    self,
    callback: EventCallback,
    event_type: EventType | None = None,
) -> None:
    """Register an event callback programmatically."""
    self._callbacks.setdefault(event_type, []).append(callback)

start async

start() -> None

Start all registered watchers.

Source code in cocapi/events/_stream.py
219
220
221
222
223
async def start(self) -> None:
    """Start all registered watchers."""
    self._running = True
    for watcher in self._watchers:
        watcher.start()

stop async

stop() -> None

Stop all watchers and persist state if configured.

Source code in cocapi/events/_stream.py
225
226
227
228
229
230
231
async def stop(self) -> None:
    """Stop all watchers and persist state if configured."""
    self._running = False
    for watcher in self._watchers:
        await watcher.stop()
    if self._persist_path:
        self._state.save(self._persist_path)

run async

run() -> None

Run the stream with callback dispatch until stopped.

Call stream.stop() from a callback or signal handler to exit.

Source code in cocapi/events/_stream.py
233
234
235
236
237
238
239
240
async def run(self) -> None:
    """Run the stream with callback dispatch until stopped.

    Call ``stream.stop()`` from a callback or signal handler to exit.
    """
    async with self:
        async for event in self:
            await self._dispatch(event)