Skip to content

API Reference

Full reference for all public modules in the cocapi package. Select a module from the navigation to view its documentation.

cocapi

Clash of Clans API Wrapper - cocapi v4.0.0

A Python wrapper for the official Clash of Clans API with enhanced features: - Async and sync support - Request caching with TTL - Automatic retries with exponential backoff - Request metrics and monitoring - Middleware system for request/response processing - Dynamic Pydantic model generation - Custom endpoint support for future API changes - Configurable base URL with safety warnings

Basic Usage

from cocapi import CocApi

api = CocApi("your_api_token") clan = api.clan_tag("#CLAN_TAG")

Advanced Usage

from cocapi import CocApi, ApiConfig

config = ApiConfig( enable_caching=True, cache_ttl=600, max_retries=3, enable_metrics=True )

api = CocApi("your_token", config=config)

Async Usage

from cocapi import CocApi, ApiConfig

async def get_clan_info(): config = ApiConfig(enable_rate_limiting=True) async with CocApi("token", config=config) as api: clan = await api.clan_tag("#CLAN_TAG") return clan

AsyncCocApiCore

AsyncCocApiCore(token: str, config: ApiConfig)

Core async functionality for CocApi

Initialize async core

PARAMETER DESCRIPTION
token

API token

TYPE: str

config

Configuration object

TYPE: ApiConfig

Source code in cocapi/async_client.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(self, token: str, config: ApiConfig):
    """
    Initialize async core

    Args:
        token: API token
        config: Configuration object
    """
    self.token = token
    self.config = config
    self.headers = {
        "authorization": f"Bearer {token}",
        "Accept": "application/json",
    }

    # Initialize components
    self.cache = CacheManager(default_ttl=config.cache_ttl)
    self.cache.enable() if config.enable_caching else self.cache.disable()

    self.metrics = MetricsTracker(max_metrics=config.metrics_window_size)
    self.metrics.enable() if config.enable_metrics else self.metrics.disable()

    self.middleware = MiddlewareManager()

    # Async-specific attributes
    self._client: httpx.AsyncClient | None = None
    self._should_close_client = False
    self._rate_limiter: AsyncRateLimiter | None = None

    # Key manager state (set via set_key_manager_state())
    self._km_email: str | None = None
    self._km_password: str | None = None
    self._km_key_name: str | None = None
    self._km_key_count: int = 1
    self._km_auto_refresh: bool = False
    self._km_persist_keys: bool = False
    self._km_key_storage_path: str | None = None

    if config.enable_rate_limiting:
        self._rate_limiter = AsyncRateLimiter(
            rate=config.requests_per_second, burst=config.burst_limit
        )

__aenter__ async

__aenter__() -> AsyncCocApiCore

Async context manager entry

Source code in cocapi/async_client.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def __aenter__(self) -> "AsyncCocApiCore":
    """Async context manager entry"""
    if self._client is None:
        self._client = httpx.AsyncClient(
            timeout=self.config.timeout,
            limits=httpx.Limits(
                max_connections=self.config.max_connections,
                max_keepalive_connections=self.config.max_keepalive_connections,
            ),
            http2=True,
        )
        self._should_close_client = True

    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit

Source code in cocapi/async_client.py
114
115
116
117
118
119
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit"""
    if self._should_close_client and self._client:
        await self._client.aclose()
        self._client = None
        self._should_close_client = False

set_key_manager_state

set_key_manager_state(
    email: str,
    password: str,
    key_name: str,
    key_count: int,
    auto_refresh: bool,
    persist_keys: bool = False,
    key_storage_path: str | None = None,
) -> None

Store key manager credentials for auto-refresh on 403.

Source code in cocapi/async_client.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def set_key_manager_state(
    self,
    email: str,
    password: str,
    key_name: str,
    key_count: int,
    auto_refresh: bool,
    persist_keys: bool = False,
    key_storage_path: str | None = None,
) -> None:
    """Store key manager credentials for auto-refresh on 403."""
    self._km_email = email
    self._km_password = password
    self._km_key_name = key_name
    self._km_key_count = key_count
    self._km_auto_refresh = auto_refresh
    self._km_persist_keys = persist_keys
    self._km_key_storage_path = key_storage_path

make_request async

make_request(
    endpoint: str,
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
    _refresh_attempted: bool = False,
) -> dict[str, Any]

Make an async API request

PARAMETER DESCRIPTION
endpoint

API endpoint path

TYPE: str

params

Request parameters

TYPE: dict[str, Any] | None DEFAULT: None

use_dynamic_model

Whether to create dynamic Pydantic models

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, Any]

API response as dictionary

Source code in cocapi/async_client.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
async def make_request(
    self,
    endpoint: str,
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
    _refresh_attempted: bool = False,
) -> dict[str, Any]:
    """
    Make an async API request

    Args:
        endpoint: API endpoint path
        params: Request parameters
        use_dynamic_model: Whether to create dynamic Pydantic models

    Returns:
        API response as dictionary
    """
    start_time = time.time()
    cache_hit = False

    # Build URL
    url = build_url(self.config.base_url, endpoint, params)

    # Check cache first
    if self.config.enable_caching:
        cached_response = self.cache.get(url, params)
        if cached_response is not None:
            cache_hit = True

            # Record metrics for cache hit
            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="GET",
                    status_code=200,
                    response_time=time.time() - start_time,
                    cache_hit=True,
                )

            # Apply dynamic model if requested
            if use_dynamic_model:
                cached_response = create_dynamic_model(cached_response, endpoint)

            return cached_response

    # Apply rate limiting
    if self._rate_limiter:
        await self._rate_limiter.acquire()

    # Apply request middleware
    headers = self.headers.copy()
    url, headers, params = self.middleware.apply_request_middleware(
        url, headers, params or {}
    )

    # Make the request with retries
    for attempt in range(self.config.max_retries):
        try:
            if not self._client:
                raise RuntimeError(
                    "AsyncCocApiCore not initialized. Use 'async with' context manager."
                )

            response = await self._client.get(url, headers=headers)
            response_time = time.time() - start_time

            if is_successful_response(response.status_code):
                # Parse JSON response
                try:
                    json_response = response.json()
                except Exception as e:
                    error_response = self._handle_json_error(e, attempt)
                    if self.config.enable_metrics:
                        self.metrics.record_request(
                            endpoint=endpoint,
                            method="GET",
                            status_code=response.status_code,
                            response_time=response_time,
                            cache_hit=False,
                            error_type="json",
                        )
                    return error_response

                # Apply response middleware
                json_response = self.middleware.apply_response_middleware(
                    json_response
                )

                # Cache successful response
                if self.config.enable_caching:
                    self.cache.set(url, params, json_response)

                # Record metrics
                if self.config.enable_metrics:
                    self.metrics.record_request(
                        endpoint=endpoint,
                        method="GET",
                        status_code=response.status_code,
                        response_time=response_time,
                        cache_hit=cache_hit,
                    )

                # Apply dynamic model if requested
                if use_dynamic_model:
                    json_response = create_dynamic_model(json_response, endpoint)

                return json_response

            else:
                # Auto-refresh on accessDenied.invalidIp (once only)
                if (
                    response.status_code == 403
                    and not _refresh_attempted
                    and self._should_auto_refresh_keys()
                ):
                    try:
                        body = response.json()
                        if body.get("reason") == "accessDenied.invalidIp":
                            if await self._refresh_token():
                                return await self.make_request(
                                    endpoint,
                                    params,
                                    use_dynamic_model,
                                    _refresh_attempted=True,
                                )
                    except Exception:
                        # Auto-refresh is best-effort; fall through to normal error handling
                        pass

                # Handle HTTP errors
                error_response = self._handle_http_error(
                    response.status_code, attempt
                )

                if self.config.enable_metrics:
                    self.metrics.record_request(
                        endpoint=endpoint,
                        method="GET",
                        status_code=response.status_code,
                        response_time=response_time,
                        cache_hit=False,
                        error_type="http",
                    )

                if (
                    not should_retry_error(response.status_code)
                    or attempt >= self.config.max_retries - 1
                ):
                    return error_response

                # Wait before retry
                await asyncio.sleep(self.config.retry_delay * (2**attempt))

        except httpx.TimeoutException as e:
            error_response = self._handle_network_error(e, attempt)
            response_time = time.time() - start_time

            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="GET",
                    status_code=0,
                    response_time=response_time,
                    cache_hit=False,
                    error_type="timeout",
                )

            if attempt >= self.config.max_retries - 1:
                return error_response

            await asyncio.sleep(self.config.retry_delay * (2**attempt))

        except Exception as e:
            error_response = self._handle_network_error(e, attempt)
            response_time = time.time() - start_time

            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="GET",
                    status_code=0,
                    response_time=response_time,
                    cache_hit=False,
                    error_type="connection",
                )

            if attempt >= self.config.max_retries - 1:
                return error_response

            await asyncio.sleep(self.config.retry_delay * (2**attempt))

    # This should never be reached, but just in case
    return {
        "result": "error",
        "message": "Max retries exceeded",
        "error_type": "retry_exhausted",
    }

make_post_request async

make_post_request(
    endpoint: str,
    json_body: dict[str, Any],
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
    _refresh_attempted: bool = False,
) -> dict[str, Any]

Make an async POST API request

PARAMETER DESCRIPTION
endpoint

API endpoint path

TYPE: str

json_body

JSON body to send with the POST request

TYPE: dict[str, Any]

params

Optional query parameters

TYPE: dict[str, Any] | None DEFAULT: None

use_dynamic_model

Whether to create dynamic Pydantic models

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, Any]

API response as dictionary

Source code in cocapi/async_client.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
async def make_post_request(
    self,
    endpoint: str,
    json_body: dict[str, Any],
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
    _refresh_attempted: bool = False,
) -> dict[str, Any]:
    """
    Make an async POST API request

    Args:
        endpoint: API endpoint path
        json_body: JSON body to send with the POST request
        params: Optional query parameters
        use_dynamic_model: Whether to create dynamic Pydantic models

    Returns:
        API response as dictionary
    """
    start_time = time.time()

    # Apply rate limiting
    if self._rate_limiter:
        await self._rate_limiter.acquire()

    # Apply request middleware before building URL so middleware can modify params
    headers = self.headers.copy()
    url_base = build_url(self.config.base_url, endpoint, None)
    url_base, headers, params = self.middleware.apply_request_middleware(
        url_base, headers, params or {}
    )
    # Rebuild URL with (potentially modified) params
    url = build_url(self.config.base_url, endpoint, params or None)

    # Make the request with retries
    for attempt in range(self.config.max_retries):
        try:
            if not self._client:
                raise RuntimeError(
                    "AsyncCocApiCore not initialized. Use 'async with' context manager."
                )

            response = await self._client.post(url, headers=headers, json=json_body)
            response_time = time.time() - start_time

            if is_successful_response(response.status_code):
                try:
                    json_response = response.json()
                except Exception as e:
                    error_response = self._handle_json_error(e, attempt)
                    if self.config.enable_metrics:
                        self.metrics.record_request(
                            endpoint=endpoint,
                            method="POST",
                            status_code=response.status_code,
                            response_time=response_time,
                            cache_hit=False,
                            error_type="json",
                        )
                    return error_response

                # Apply response middleware
                json_response = self.middleware.apply_response_middleware(
                    json_response
                )

                # Record metrics
                if self.config.enable_metrics:
                    self.metrics.record_request(
                        endpoint=endpoint,
                        method="POST",
                        status_code=response.status_code,
                        response_time=response_time,
                        cache_hit=False,
                    )

                # Apply dynamic model if requested
                if use_dynamic_model:
                    json_response = create_dynamic_model(json_response, endpoint)

                return json_response

            else:
                # Auto-refresh on accessDenied.invalidIp (once only)
                if (
                    response.status_code == 403
                    and not _refresh_attempted
                    and self._should_auto_refresh_keys()
                ):
                    try:
                        body = response.json()
                        if body.get("reason") == "accessDenied.invalidIp":
                            if await self._refresh_token():
                                return await self.make_post_request(
                                    endpoint,
                                    json_body,
                                    params,
                                    use_dynamic_model,
                                    _refresh_attempted=True,
                                )
                    except Exception:
                        # Auto-refresh is best-effort; fall through to normal error handling
                        pass

                error_response = self._handle_http_error(
                    response.status_code, attempt
                )

                if self.config.enable_metrics:
                    self.metrics.record_request(
                        endpoint=endpoint,
                        method="POST",
                        status_code=response.status_code,
                        response_time=response_time,
                        cache_hit=False,
                        error_type="http",
                    )

                if (
                    not should_retry_error(response.status_code)
                    or attempt >= self.config.max_retries - 1
                ):
                    return error_response

                await asyncio.sleep(self.config.retry_delay * (2**attempt))

        except httpx.TimeoutException as e:
            error_response = self._handle_network_error(e, attempt)
            response_time = time.time() - start_time

            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="POST",
                    status_code=0,
                    response_time=response_time,
                    cache_hit=False,
                    error_type="timeout",
                )

            if attempt >= self.config.max_retries - 1:
                return error_response

            await asyncio.sleep(self.config.retry_delay * (2**attempt))

        except Exception as e:
            error_response = self._handle_network_error(e, attempt)
            response_time = time.time() - start_time

            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="POST",
                    status_code=0,
                    response_time=response_time,
                    cache_hit=False,
                    error_type="connection",
                )

            if attempt >= self.config.max_retries - 1:
                return error_response

            await asyncio.sleep(self.config.retry_delay * (2**attempt))

    return {
        "result": "error",
        "message": "Max retries exceeded",
        "error_type": "retry_exhausted",
    }

test_connection async

test_connection() -> dict[str, Any]

Test API connection

Source code in cocapi/async_client.py
599
600
601
602
603
604
605
606
607
608
609
610
611
async def test_connection(self) -> dict[str, Any]:
    """Test API connection"""
    try:
        response = await self.make_request("/locations")
        if response.get("result") == "error":
            return response
        return {"result": "success", "message": "API connection successful"}
    except Exception as e:
        return {
            "result": "error",
            "message": f"Connection test failed: {str(e)}",
            "error_type": "connection",
        }

AsyncRateLimiter

AsyncRateLimiter(rate: float, burst: int)

Simple async rate limiter using token bucket algorithm

Initialize rate limiter

PARAMETER DESCRIPTION
rate

Requests per second

TYPE: float

burst

Maximum burst requests

TYPE: int

Source code in cocapi/async_client.py
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(self, rate: float, burst: int):
    """
    Initialize rate limiter

    Args:
        rate: Requests per second
        burst: Maximum burst requests
    """
    self.rate = rate
    self.burst = burst
    self.tokens = float(burst)
    self.last_update = time.time()
    self._lock = asyncio.Lock()

acquire async

acquire() -> None

Acquire permission to make a request

Source code in cocapi/async_client.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
async def acquire(self) -> None:
    """Acquire permission to make a request"""
    async with self._lock:
        now = time.time()
        time_passed = now - self.last_update
        self.tokens = min(float(self.burst), self.tokens + time_passed * self.rate)
        self.last_update = now

        if self.tokens < 1:
            sleep_time = (1 - self.tokens) / self.rate
            await asyncio.sleep(sleep_time)
            self.tokens = 0
        else:
            self.tokens -= 1

CacheManager

CacheManager(default_ttl: int = 300)

Manages response caching with TTL support

Initialize cache manager

PARAMETER DESCRIPTION
default_ttl

Default time-to-live in seconds (5 minutes)

TYPE: int DEFAULT: 300

Source code in cocapi/cache.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(self, default_ttl: int = 300):
    """
    Initialize cache manager

    Args:
        default_ttl: Default time-to-live in seconds (5 minutes)
    """
    self.cache: dict[str, CacheEntry] = {}
    self.default_ttl = default_ttl
    self._enabled = True
    self._stats = {
        "hits": 0,
        "misses": 0,
        "sets": 0,
        "evictions": 0,
    }

enable

enable() -> None

Enable caching

Source code in cocapi/cache.py
32
33
34
def enable(self) -> None:
    """Enable caching"""
    self._enabled = True

disable

disable() -> None

Disable caching

Source code in cocapi/cache.py
36
37
38
def disable(self) -> None:
    """Disable caching"""
    self._enabled = False

is_enabled

is_enabled() -> bool

Check if caching is enabled

Source code in cocapi/cache.py
40
41
42
def is_enabled(self) -> bool:
    """Check if caching is enabled"""
    return self._enabled

get

get(
    url: str, params: dict[str, Any] | None = None
) -> dict[str, Any] | None

Get cached response if available and not expired

PARAMETER DESCRIPTION
url

Request URL

TYPE: str

params

Request parameters

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any] | None

Cached response data or None if not found/expired

Source code in cocapi/cache.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get(
    self, url: str, params: dict[str, Any] | None = None
) -> dict[str, Any] | None:
    """
    Get cached response if available and not expired

    Args:
        url: Request URL
        params: Request parameters

    Returns:
        Cached response data or None if not found/expired
    """
    if not self._enabled:
        return None

    cache_key = get_cache_key(url, params)
    current_time = time.time()

    if cache_key in self.cache:
        entry = self.cache[cache_key]

        if not entry.is_expired(current_time):
            self._stats["hits"] += 1
            return entry.data
        else:
            # Remove expired entry
            del self.cache[cache_key]
            self._stats["evictions"] += 1

    self._stats["misses"] += 1
    return None

set

set(
    url: str,
    params: dict[str, Any] | None,
    data: dict[str, Any],
    ttl: int | None = None,
) -> None

Cache response data

PARAMETER DESCRIPTION
url

Request URL

TYPE: str

params

Request parameters

TYPE: dict[str, Any] | None

data

Response data to cache

TYPE: dict[str, Any]

ttl

Time-to-live in seconds (uses default if not specified)

TYPE: int | None DEFAULT: None

Source code in cocapi/cache.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def set(
    self,
    url: str,
    params: dict[str, Any] | None,
    data: dict[str, Any],
    ttl: int | None = None,
) -> None:
    """
    Cache response data

    Args:
        url: Request URL
        params: Request parameters
        data: Response data to cache
        ttl: Time-to-live in seconds (uses default if not specified)
    """
    if not self._enabled:
        return

    # Don't cache error responses
    if data.get("result") == "error":
        return

    cache_key = get_cache_key(url, params)
    ttl = ttl or self.default_ttl

    self.cache[cache_key] = CacheEntry(
        data=data.copy(), timestamp=time.time(), ttl=ttl
    )

    self._stats["sets"] += 1

invalidate

invalidate(
    url: str, params: dict[str, Any] | None = None
) -> bool

Invalidate specific cached entry

PARAMETER DESCRIPTION
url

Request URL

TYPE: str

params

Request parameters

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
bool

True if entry was found and removed, False otherwise

Source code in cocapi/cache.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def invalidate(self, url: str, params: dict[str, Any] | None = None) -> bool:
    """
    Invalidate specific cached entry

    Args:
        url: Request URL
        params: Request parameters

    Returns:
        True if entry was found and removed, False otherwise
    """
    cache_key = get_cache_key(url, params)

    if cache_key in self.cache:
        del self.cache[cache_key]
        self._stats["evictions"] += 1
        return True

    return False

clear

clear() -> int

Clear all cached entries

RETURNS DESCRIPTION
int

Number of entries cleared

Source code in cocapi/cache.py
129
130
131
132
133
134
135
136
137
138
139
def clear(self) -> int:
    """
    Clear all cached entries

    Returns:
        Number of entries cleared
    """
    count = len(self.cache)
    self.cache.clear()
    self._stats["evictions"] += count
    return count

cleanup_expired

cleanup_expired() -> int

Remove expired entries

RETURNS DESCRIPTION
int

Number of expired entries removed

Source code in cocapi/cache.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def cleanup_expired(self) -> int:
    """
    Remove expired entries

    Returns:
        Number of expired entries removed
    """
    current_time = time.time()
    expired_keys = [
        key for key, entry in self.cache.items() if entry.is_expired(current_time)
    ]

    for key in expired_keys:
        del self.cache[key]

    self._stats["evictions"] += len(expired_keys)
    return len(expired_keys)

get_stats

get_stats() -> dict[str, Any]

Get cache statistics

Source code in cocapi/cache.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def get_stats(self) -> dict[str, Any]:
    """Get cache statistics"""
    total_requests = self._stats["hits"] + self._stats["misses"]
    hit_rate = (
        (self._stats["hits"] / total_requests * 100) if total_requests > 0 else 0
    )

    # Count expired entries
    current_time = time.time()
    expired_count = sum(
        1 for entry in self.cache.values() if entry.is_expired(current_time)
    )

    return {
        "enabled": self._enabled,
        "total_entries": len(self.cache),
        "expired_entries": expired_count,
        "valid_entries": len(self.cache) - expired_count,
        "default_ttl": self.default_ttl,
        "hit_rate": round(hit_rate, 2),
        "stats": self._stats.copy(),
    }

get_cache_info

get_cache_info() -> dict[str, Any]

Get detailed cache information

Source code in cocapi/cache.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def get_cache_info(self) -> dict[str, Any]:
    """Get detailed cache information"""
    current_time = time.time()
    entries = []

    for key, entry in self.cache.items():
        entries.append(
            {
                "key": key,
                "size_estimate": len(str(entry.data)),
                "ttl": entry.ttl,
                "age": round(current_time - entry.timestamp, 2),
                "expires_in": round(
                    (entry.timestamp + entry.ttl) - current_time, 2
                ),
                "is_expired": entry.is_expired(current_time),
            }
        )

    # Sort by expiration time (soonest to expire first)
    from typing import cast

    entries.sort(key=lambda x: cast(float, x["expires_in"]))

    return {
        "entries": entries,
        "memory_usage_estimate": sum(e["size_estimate"] for e in entries),
    }

set_default_ttl

set_default_ttl(ttl: int) -> None

Set default TTL for new cache entries

Source code in cocapi/cache.py
211
212
213
def set_default_ttl(self, ttl: int) -> None:
    """Set default TTL for new cache entries"""
    self.default_ttl = ttl

get_entry_info

get_entry_info(
    url: str, params: dict[str, Any] | None = None
) -> dict[str, Any] | None

Get information about a specific cache entry

Source code in cocapi/cache.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def get_entry_info(
    self, url: str, params: dict[str, Any] | None = None
) -> dict[str, Any] | None:
    """Get information about a specific cache entry"""
    cache_key = get_cache_key(url, params)

    if cache_key not in self.cache:
        return None

    entry = self.cache[cache_key]
    current_time = time.time()

    return {
        "key": cache_key,
        "ttl": entry.ttl,
        "age": round(current_time - entry.timestamp, 2),
        "expires_in": round((entry.timestamp + entry.ttl) - current_time, 2),
        "is_expired": entry.is_expired(current_time),
        "size_estimate": len(str(entry.data)),
        "cached_at": entry.timestamp,
    }

extend_ttl

extend_ttl(
    url: str,
    params: dict[str, Any] | None = None,
    additional_seconds: int = 300,
) -> bool

Extend TTL for a specific cache entry

PARAMETER DESCRIPTION
url

Request URL

TYPE: str

params

Request parameters

TYPE: dict[str, Any] | None DEFAULT: None

additional_seconds

Seconds to add to current TTL

TYPE: int DEFAULT: 300

RETURNS DESCRIPTION
bool

True if entry was found and updated, False otherwise

Source code in cocapi/cache.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def extend_ttl(
    self,
    url: str,
    params: dict[str, Any] | None = None,
    additional_seconds: int = 300,
) -> bool:
    """
    Extend TTL for a specific cache entry

    Args:
        url: Request URL
        params: Request parameters
        additional_seconds: Seconds to add to current TTL

    Returns:
        True if entry was found and updated, False otherwise
    """
    cache_key = get_cache_key(url, params)

    if cache_key in self.cache:
        entry = self.cache[cache_key]
        entry.ttl += additional_seconds
        return True

    return False

touch

touch(
    url: str, params: dict[str, Any] | None = None
) -> bool

Reset timestamp for cache entry (extends its life without changing TTL)

PARAMETER DESCRIPTION
url

Request URL

TYPE: str

params

Request parameters

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
bool

True if entry was found and touched, False otherwise

Source code in cocapi/cache.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def touch(self, url: str, params: dict[str, Any] | None = None) -> bool:
    """
    Reset timestamp for cache entry (extends its life without changing TTL)

    Args:
        url: Request URL
        params: Request parameters

    Returns:
        True if entry was found and touched, False otherwise
    """
    cache_key = get_cache_key(url, params)

    if cache_key in self.cache:
        entry = self.cache[cache_key]
        entry.timestamp = time.time()
        return True

    return False

CocApi

CocApi(
    token: str,
    timeout: int = 20,
    status_code: bool = False,
    config: ApiConfig | None = None,
    async_mode: bool = False,
)

Bases: ApiMethods

Clash of Clans API Wrapper with enhanced v3.0.0 features

Provides both sync and async interfaces with caching, metrics, middleware, and dynamic model generation capabilities.

Initialize CocApi with enhanced features

PARAMETER DESCRIPTION
token

API token from developer.clashofclans.com

TYPE: str

timeout

Request timeout in seconds (backward compatibility)

TYPE: int DEFAULT: 20

status_code

Include status code in responses (backward compatibility)

TYPE: bool DEFAULT: False

config

Optional ApiConfig for advanced settings

TYPE: ApiConfig | None DEFAULT: None

async_mode

Enable async mode (default: False for backward compatibility)

TYPE: bool DEFAULT: False

Source code in cocapi/client.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(
    self,
    token: str,
    timeout: int = 20,
    status_code: bool = False,
    config: ApiConfig | None = None,
    async_mode: bool = False,
):
    """
    Initialize CocApi with enhanced features

    Args:
        token: API token from developer.clashofclans.com
        timeout: Request timeout in seconds (backward compatibility)
        status_code: Include status code in responses (backward compatibility)
        config: Optional ApiConfig for advanced settings
        async_mode: Enable async mode (default: False for backward compatibility)
    """
    self.token = token
    self.timeout = timeout
    self.status_code = status_code
    self.config = config or ApiConfig(timeout=timeout)
    self.async_mode = async_mode

    # Use config timeout if provided, otherwise use parameter
    if config is None:
        self.config.timeout = timeout

    self.ENDPOINT = self.config.base_url  # Backward compatibility
    self.headers = {
        "authorization": f"Bearer {token}",
        "Accept": "application/json",
    }

    # Initialize components
    self.cache = CacheManager(default_ttl=self.config.cache_ttl)
    self.cache.enable() if self.config.enable_caching else self.cache.disable()

    self.metrics = MetricsTracker(max_metrics=self.config.metrics_window_size)
    self.metrics.enable() if self.config.enable_metrics else self.metrics.disable()

    self.middleware = MiddlewareManager()

    # Async core for async operations
    self._async_core: AsyncCocApiCore | None = None

    # Key manager state (set by from_credentials())
    self._km_email: str | None = None
    self._km_password: str | None = None
    self._km_tokens: list[str] = []

    self.DEFAULT_PARAMS = ("limit", "after", "before")
    self.ERROR_INVALID_PARAM = {
        "result": "error",
        "message": "Invalid params for method",
    }

    # Only test sync mode immediately (async mode tests in context manager)
    if not async_mode:
        test_response = self.test()
        if test_response.get("result") == "error":
            raise ValueError(
                f"API initialization failed: {test_response.get('message')}"
            )

from_credentials classmethod

from_credentials(
    email: str,
    password: str,
    timeout: int = 20,
    status_code: bool = False,
    config: ApiConfig | None = None,
) -> CocApi

Create a CocApi instance using SuperCell developer portal credentials.

Instead of providing a raw API token, provide your developer portal email and password. Keys are automatically created and managed based on your current public IP.

PARAMETER DESCRIPTION
email

SuperCell developer portal email

TYPE: str

password

SuperCell developer portal password

TYPE: str

timeout

Request timeout in seconds

TYPE: int DEFAULT: 20

status_code

Include status code in responses

TYPE: bool DEFAULT: False

config

Optional ApiConfig for advanced settings

TYPE: ApiConfig | None DEFAULT: None

RETURNS DESCRIPTION
CocApi

Configured CocApi instance with a managed API token

Example::

api = CocApi.from_credentials("email@example.com", "password")
clan = api.clan_tag("#CLAN_TAG")
Source code in cocapi/client.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@classmethod
def from_credentials(
    cls,
    email: str,
    password: str,
    timeout: int = 20,
    status_code: bool = False,
    config: ApiConfig | None = None,
) -> "CocApi":
    """
    Create a CocApi instance using SuperCell developer portal credentials.

    Instead of providing a raw API token, provide your developer portal
    email and password. Keys are automatically created and managed
    based on your current public IP.

    Args:
        email: SuperCell developer portal email
        password: SuperCell developer portal password
        timeout: Request timeout in seconds
        status_code: Include status code in responses
        config: Optional ApiConfig for advanced settings

    Returns:
        Configured CocApi instance with a managed API token

    Example::

        api = CocApi.from_credentials("email@example.com", "password")
        clan = api.clan_tag("#CLAN_TAG")
    """
    from .key_manager import SyncKeyManager

    config = config or ApiConfig(timeout=timeout)

    with SyncKeyManager(
        email=email,
        password=password,
        key_name=config.key_name,
        key_count=config.key_count,
        key_description=config.key_description,
        persist_keys=config.persist_keys,
        key_storage_path=config.key_storage_path,
    ) as km:
        tokens = km.manage_keys()

    instance = cls(
        token=tokens[0],
        timeout=timeout,
        status_code=status_code,
        config=config,
    )
    instance._km_email = email
    instance._km_password = password
    instance._km_tokens = tokens
    return instance

__aenter__ async

__aenter__() -> CocApi

Async context manager entry - enables async mode automatically

Source code in cocapi/client.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def __aenter__(self) -> "CocApi":
    """Async context manager entry - enables async mode automatically"""
    if not self.async_mode:
        self.async_mode = True  # Auto-enable async mode in context

    self._async_core = AsyncCocApiCore(self.token, self.config)
    await self._async_core.__aenter__()

    # Propagate key manager state for auto-refresh
    if self._km_email is not None:
        self._async_core.set_key_manager_state(
            email=self._km_email,
            password=self._km_password,  # type: ignore[arg-type]
            key_name=self.config.key_name,
            key_count=self.config.key_count,
            auto_refresh=self.config.auto_refresh_keys,
            persist_keys=self.config.persist_keys,
            key_storage_path=self.config.key_storage_path,
        )

    # Test API connection in async mode
    test_response = await self._async_core.test_connection()
    if test_response.get("result") == "error":
        await self.__aexit__(None, None, None)
        raise ValueError(
            f"Async API initialization failed: {test_response.get('message')}"
        )

    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit

Source code in cocapi/client.py
228
229
230
231
232
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit"""
    if self._async_core:
        await self._async_core.__aexit__(exc_type, exc_val, exc_tb)
        self._async_core = None

test

test() -> dict[str, Any]

Test API connection

Source code in cocapi/client.py
510
511
512
513
514
515
516
517
518
519
520
521
522
def test(self) -> dict[str, Any]:
    """Test API connection"""
    try:
        response = self._sync_api_response("/locations")
        if response.get("result") == "error":
            return response
        return {"result": "success", "message": "API connection successful"}
    except Exception as e:
        return {
            "result": "error",
            "message": f"Connection test failed: {str(e)}",
            "error_type": "connection",
        }

paginate

paginate(
    method: Callable[..., Any], *args: Any, limit: int = 100
) -> Any

Auto-paginate through all results from a list endpoint.

Yields individual items from each page, automatically following the after cursor until all pages are exhausted.

PARAMETER DESCRIPTION
method

An API method that returns paginated results (e.g. api.clan_members).

TYPE: Callable[..., Any]

*args

Positional arguments for the method excluding the params dict (e.g. the clan tag).

TYPE: Any DEFAULT: ()

limit

Items per page (default 100).

TYPE: int DEFAULT: 100

RETURNS DESCRIPTION
Any

Generator (sync) or async generator (async) of individual items.

Example::

for member in api.paginate(api.clan_members, "#TAG"):
    print(member["name"])
Source code in cocapi/client.py
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
def paginate(
    self,
    method: Callable[..., Any],
    *args: Any,
    limit: int = 100,
) -> Any:
    """Auto-paginate through all results from a list endpoint.

    Yields individual items from each page, automatically following
    the ``after`` cursor until all pages are exhausted.

    Args:
        method: An API method that returns paginated results
                (e.g. ``api.clan_members``).
        *args:  Positional arguments for the method **excluding** the
                ``params`` dict (e.g. the clan tag).
        limit:  Items per page (default 100).

    Returns:
        Generator (sync) or async generator (async) of individual items.

    Example::

        for member in api.paginate(api.clan_members, "#TAG"):
            print(member["name"])
    """
    if self.async_mode:
        return self._apaginate(method, *args, limit=limit)
    return self._paginate(method, *args, limit=limit)

batch

batch(
    method: Callable[..., Any],
    args_list: list[Any],
    max_concurrent: int | None = None,
) -> list[dict[str, Any]] | Awaitable[list[dict[str, Any]]]

Fetch multiple resources in one call.

PARAMETER DESCRIPTION
method

An API method (e.g. api.players).

TYPE: Callable[..., Any]

args_list

List of arguments — each element is passed to method. Use tuples for methods that take multiple positional args.

TYPE: list[Any]

max_concurrent

Limit concurrent requests in async mode (ignored in sync mode).

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
list[dict[str, Any]] | Awaitable[list[dict[str, Any]]]

List of response dicts (sync) or awaitable list (async).

list[dict[str, Any]] | Awaitable[list[dict[str, Any]]]

Failed calls return their error dict in-place.

Example::

results = api.batch(api.players, ["#TAG1", "#TAG2", "#TAG3"])
Source code in cocapi/client.py
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
def batch(
    self,
    method: Callable[..., Any],
    args_list: list[Any],
    max_concurrent: int | None = None,
) -> list[dict[str, Any]] | Awaitable[list[dict[str, Any]]]:
    """Fetch multiple resources in one call.

    Args:
        method:         An API method (e.g. ``api.players``).
        args_list:      List of arguments — each element is passed to
                        *method*.  Use tuples for methods that take
                        multiple positional args.
        max_concurrent: Limit concurrent requests in async mode
                        (ignored in sync mode).

    Returns:
        List of response dicts (sync) or awaitable list (async).
        Failed calls return their error dict in-place.

    Example::

        results = api.batch(api.players, ["#TAG1", "#TAG2", "#TAG3"])
    """
    if self.async_mode:
        return self._abatch(method, args_list, max_concurrent)
    return self._batch(method, args_list)

custom_endpoint

custom_endpoint(
    endpoint_path: str,
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
) -> dict[str, Any] | Awaitable[dict[str, Any]]

Call a custom API endpoint (future-proofing for new SuperCell endpoints)

PARAMETER DESCRIPTION
endpoint_path

The endpoint path (e.g., "/clans/new-feature")

TYPE: str

params

Optional query parameters

TYPE: dict[str, Any] | None DEFAULT: None

use_dynamic_model

Whether to generate dynamic Pydantic models

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, Any] | Awaitable[dict[str, Any]]

API response as dict or Pydantic model

Source code in cocapi/client.py
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
def custom_endpoint(
    self,
    endpoint_path: str,
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
) -> dict[str, Any] | Awaitable[dict[str, Any]]:
    """
    Call a custom API endpoint (future-proofing for new SuperCell endpoints)

    Args:
        endpoint_path: The endpoint path (e.g., "/clans/new-feature")
        params: Optional query parameters
        use_dynamic_model: Whether to generate dynamic Pydantic models

    Returns:
        API response as dict or Pydantic model
    """
    if not endpoint_path.startswith("/"):
        endpoint_path = "/" + endpoint_path

    return self._api_response(endpoint_path, params, use_dynamic_model)

set_base_url

set_base_url(
    new_base_url: str, force: bool = False
) -> None

Change the base API URL with safety warnings

PARAMETER DESCRIPTION
new_base_url

New base URL to use

TYPE: str

force

Skip safety warnings (use with caution)

TYPE: bool DEFAULT: False

Source code in cocapi/client.py
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
def set_base_url(self, new_base_url: str, force: bool = False) -> None:
    """
    Change the base API URL with safety warnings

    Args:
        new_base_url: New base URL to use
        force: Skip safety warnings (use with caution)
    """
    official_url = "https://api.clashofclans.com/v1"

    if new_base_url != official_url and not force:
        warn(
            f"⚠️  WARNING: Changing base URL from official endpoint!\n"
            f"   Official: {official_url}\n"
            f"   New URL:  {new_base_url}\n"
            f"   This may result in API failures or unexpected behavior.\n"
            f"   Use force=True to suppress this warning.",
            UserWarning,
            stacklevel=2,
        )

    original_url = self.config.base_url
    self.config.base_url = new_base_url
    self.ENDPOINT = new_base_url  # Update backward compatibility field

    logging.info(f"Base URL changed from {original_url} to {new_base_url}")

get_base_url

get_base_url() -> str

Get current base URL

Source code in cocapi/client.py
964
965
966
def get_base_url(self) -> str:
    """Get current base URL"""
    return self.config.base_url

reset_base_url

reset_base_url() -> None

Reset to official SuperCell API URL

Source code in cocapi/client.py
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
def reset_base_url(self) -> None:
    """Reset to official SuperCell API URL"""
    official_url = "https://api.clashofclans.com/v1"
    original_url = self.config.base_url

    self.config.base_url = official_url
    self.ENDPOINT = official_url

    if original_url != official_url:
        warn(
            f"Base URL reset to official SuperCell endpoint: {official_url}",
            UserWarning,
            stacklevel=2,
        )
        logging.info(f"Base URL reset from {original_url} to {official_url}")
    else:
        logging.info("Base URL is already set to the official endpoint")

add_request_middleware

add_request_middleware(
    middleware: Callable[
        [str, dict[str, str], dict[str, Any]],
        tuple[str, dict[str, str], dict[str, Any]],
    ],
) -> None

Add request middleware - delegates to middleware manager

Source code in cocapi/client.py
987
988
989
990
991
992
993
994
995
def add_request_middleware(
    self,
    middleware: Callable[
        [str, dict[str, str], dict[str, Any]],
        tuple[str, dict[str, str], dict[str, Any]],
    ],
) -> None:
    """Add request middleware - delegates to middleware manager"""
    self.middleware.add_request_middleware(middleware)

add_response_middleware

add_response_middleware(
    middleware: Callable[[dict[str, Any]], dict[str, Any]],
) -> None

Add response middleware - delegates to middleware manager

Source code in cocapi/client.py
 997
 998
 999
1000
1001
def add_response_middleware(
    self, middleware: Callable[[dict[str, Any]], dict[str, Any]]
) -> None:
    """Add response middleware - delegates to middleware manager"""
    self.middleware.add_response_middleware(middleware)

get_metrics

get_metrics() -> dict[str, Any]

Get API metrics summary

Source code in cocapi/client.py
1004
1005
1006
def get_metrics(self) -> dict[str, Any]:
    """Get API metrics summary"""
    return self.metrics.get_metrics_summary()

clear_metrics

clear_metrics() -> None

Clear stored metrics

Source code in cocapi/client.py
1008
1009
1010
def clear_metrics(self) -> None:
    """Clear stored metrics"""
    self.metrics.clear_metrics()

clear_cache

clear_cache() -> int

Clear API response cache

Source code in cocapi/client.py
1013
1014
1015
def clear_cache(self) -> int:
    """Clear API response cache"""
    return self.cache.clear()

get_cache_stats

get_cache_stats() -> dict[str, Any]

Get cache statistics

Source code in cocapi/client.py
1017
1018
1019
def get_cache_stats(self) -> dict[str, Any]:
    """Get cache statistics"""
    return self.cache.get_stats()

ApiConfig dataclass

ApiConfig(
    base_url: str = "https://api.clashofclans.com/v1",
    timeout: int = 20,
    max_retries: int = 3,
    retry_delay: float = 1.0,
    cache_ttl: int = 300,
    enable_caching: bool = True,
    enable_rate_limiting: bool = True,
    use_pydantic_models: bool = False,
    requests_per_second: float = 10.0,
    burst_limit: int = 20,
    enable_metrics: bool = False,
    metrics_window_size: int = 1000,
    enable_keepalive: bool = True,
    max_connections: int = 10,
    max_keepalive_connections: int = 5,
    key_name: str = "cocapi_auto",
    key_count: int = 1,
    key_description: str = "Auto-generated by cocapi KeyManager",
    auto_refresh_keys: bool = True,
    persist_keys: bool = False,
    key_storage_path: str | None = None,
)

Configuration class for CocApi

CacheEntry dataclass

CacheEntry(data: dict, timestamp: float, ttl: int)

Cache entry with TTL support

is_expired

is_expired(current_time: float) -> bool

Check if cache entry is expired

Source code in cocapi/config.py
53
54
55
def is_expired(self, current_time: float) -> bool:
    """Check if cache entry is expired"""
    return (current_time - self.timestamp) > self.ttl

RequestMetric dataclass

RequestMetric(
    endpoint: str,
    method: str,
    status_code: int,
    response_time: float,
    timestamp: float,
    cache_hit: bool,
    error_type: str | None = None,
)

Metrics for a single API request

AsyncKeyManager

AsyncKeyManager(
    email: str,
    password: str,
    key_name: str = "cocapi_auto",
    key_count: int = 1,
    key_description: str = "Auto-generated by cocapi KeyManager",
    key_scopes: str = "clash",
    persist_keys: bool = False,
    key_storage_path: str | None = None,
)

Async key manager for the SuperCell developer portal.

Same functionality as SyncKeyManager but using httpx.AsyncClient.

Usage::

async with AsyncKeyManager("email", "password") as km:
    tokens = await km.manage_keys()
Source code in cocapi/key_manager.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def __init__(
    self,
    email: str,
    password: str,
    key_name: str = "cocapi_auto",
    key_count: int = 1,
    key_description: str = "Auto-generated by cocapi KeyManager",
    key_scopes: str = "clash",
    persist_keys: bool = False,
    key_storage_path: str | None = None,
) -> None:
    self.email = email
    self.password = password
    self.key_name = key_name
    self.key_count = min(key_count, _MAX_KEYS_PER_ACCOUNT)
    self.key_description = key_description
    self.key_scopes = key_scopes
    self.persist_keys = persist_keys
    self._cache_path = (
        Path(key_storage_path) if key_storage_path else _DEFAULT_KEY_STORAGE_PATH
    )

    self._client: httpx.AsyncClient | None = None
    self._logged_in = False
    self._current_ip: str | None = None

close async

close() -> None

Close the underlying async HTTP client.

Source code in cocapi/key_manager.py
597
598
599
600
601
async def close(self) -> None:
    """Close the underlying async HTTP client."""
    if self._client:
        await self._client.aclose()
        self._client = None

manage_keys async

manage_keys() -> list[str]

Main orchestration: ensure we have valid keys for the current IP. Same logic as SyncKeyManager.manage_keys() but async.

Source code in cocapi/key_manager.py
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
async def manage_keys(self) -> list[str]:
    """
    Main orchestration: ensure we have valid keys for the current IP.
    Same logic as SyncKeyManager.manage_keys() but async.
    """
    # Try local cache first (avoids login entirely)
    if self.persist_keys:
        cached = _load_cached_keys(self._cache_path, self.key_name)
        if cached:
            cached_tokens, cached_ip = cached
            current_ip = await self._detect_ip()
            self._current_ip = current_ip
            if cached_ip == current_ip:
                logger.info(
                    "Using %d cached token(s) (IP unchanged: %s)",
                    len(cached_tokens),
                    current_ip,
                )
                return cached_tokens
            logger.info(
                "IP changed (%s -> %s), cached tokens invalidated",
                cached_ip,
                current_ip,
            )

    await self._login()

    if not self._current_ip:
        self._current_ip = await self._detect_ip()

    existing_keys = await self._list_keys()
    valid, stale, unmanaged = _filter_managed_keys(
        existing_keys, self._current_ip, self.key_name
    )

    logger.info(
        "Key audit: %d valid, %d stale (wrong IP), %d unmanaged",
        len(valid),
        len(stale),
        len(unmanaged),
    )

    for key in stale:
        await self._revoke_key(key["id"])

    tokens = [k["key"] for k in valid]

    keys_needed = self.key_count - len(tokens)
    total_after_revoke = len(unmanaged) + len(valid)
    available_slots = _MAX_KEYS_PER_ACCOUNT - total_after_revoke

    if keys_needed > 0:
        keys_to_create = min(keys_needed, available_slots)

        if keys_to_create < keys_needed:
            logger.warning(
                "Account has %d keys (max %d). Can only create %d of %d needed.",
                total_after_revoke,
                _MAX_KEYS_PER_ACCOUNT,
                keys_to_create,
                keys_needed,
            )

        for _i in range(keys_to_create):
            suffix = f" #{len(tokens) + 1}" if self.key_count > 1 else ""
            new_key = await self._create_key(
                name=self.key_name,
                description=f"{self.key_description}{suffix}",
            )
            tokens.append(new_key["key"])

    if not tokens:
        raise KeyManagerError(
            "No usable API keys available. The account may have reached "
            "the 10-key limit with keys from other applications. "
            "Please delete unused keys at https://developer.clashofclans.com"
        )

    if self.persist_keys:
        _save_cached_keys(self._cache_path, self.key_name, tokens, self._current_ip)

    logger.info("Key manager ready with %d token(s)", len(tokens))
    return tokens

refresh_keys async

refresh_keys() -> list[str]

Re-detect IP and re-initialize keys.

Source code in cocapi/key_manager.py
818
819
820
821
822
823
824
825
async def refresh_keys(self) -> list[str]:
    """Re-detect IP and re-initialize keys."""
    logger.info("Refreshing keys due to IP change...")
    if self.persist_keys:
        _invalidate_cached_keys(self._cache_path, self.key_name)
    self._current_ip = None
    self._logged_in = False
    return await self.manage_keys()

InvalidCredentials

Bases: KeyManagerError

Raised when email/password login fails.

KeyManagerError

Bases: Exception

Base exception for key manager errors.

SyncKeyManager

SyncKeyManager(
    email: str,
    password: str,
    key_name: str = "cocapi_auto",
    key_count: int = 1,
    key_description: str = "Auto-generated by cocapi KeyManager",
    key_scopes: str = "clash",
    persist_keys: bool = False,
    key_storage_path: str | None = None,
)

Synchronous key manager for the SuperCell developer portal.

Handles login, IP detection, and API key lifecycle management using httpx.Client with cookie persistence.

Usage::

with SyncKeyManager("email", "password") as km:
    tokens = km.manage_keys()
    # Use tokens[0] with CocApi

Or via CocApi integration::

api = CocApi.from_credentials("email@example.com", "password")
Source code in cocapi/key_manager.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def __init__(
    self,
    email: str,
    password: str,
    key_name: str = "cocapi_auto",
    key_count: int = 1,
    key_description: str = "Auto-generated by cocapi KeyManager",
    key_scopes: str = "clash",
    persist_keys: bool = False,
    key_storage_path: str | None = None,
) -> None:
    self.email = email
    self.password = password
    self.key_name = key_name
    self.key_count = min(key_count, _MAX_KEYS_PER_ACCOUNT)
    self.key_description = key_description
    self.key_scopes = key_scopes
    self.persist_keys = persist_keys
    self._cache_path = (
        Path(key_storage_path) if key_storage_path else _DEFAULT_KEY_STORAGE_PATH
    )

    self._client = httpx.Client(follow_redirects=True)
    self._logged_in = False
    self._current_ip: str | None = None

close

close() -> None

Close the underlying HTTP client.

Source code in cocapi/key_manager.py
292
293
294
def close(self) -> None:
    """Close the underlying HTTP client."""
    self._client.close()

manage_keys

manage_keys() -> list[str]

Main orchestration: ensure we have valid keys for the current IP.

If persist_keys is enabled, checks the local cache first. When the cached IP matches the current IP, returns cached tokens immediately without contacting the developer portal.

  1. (Optional) Check local cache
  2. Login to developer portal
  3. Detect current public IP
  4. List existing keys
  5. Revoke stale keys (wrong IP), keep valid ones
  6. Create new keys if needed
  7. (Optional) Save tokens to local cache
  8. Return list of usable API token strings
Source code in cocapi/key_manager.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def manage_keys(self) -> list[str]:
    """
    Main orchestration: ensure we have valid keys for the current IP.

    If persist_keys is enabled, checks the local cache first. When the
    cached IP matches the current IP, returns cached tokens immediately
    without contacting the developer portal.

    1. (Optional) Check local cache
    2. Login to developer portal
    3. Detect current public IP
    4. List existing keys
    5. Revoke stale keys (wrong IP), keep valid ones
    6. Create new keys if needed
    7. (Optional) Save tokens to local cache
    8. Return list of usable API token strings
    """
    # Try local cache first (avoids login entirely)
    if self.persist_keys:
        cached = _load_cached_keys(self._cache_path, self.key_name)
        if cached:
            cached_tokens, cached_ip = cached
            current_ip = self._detect_ip()
            self._current_ip = current_ip
            if cached_ip == current_ip:
                logger.info(
                    "Using %d cached token(s) (IP unchanged: %s)",
                    len(cached_tokens),
                    current_ip,
                )
                return cached_tokens
            logger.info(
                "IP changed (%s -> %s), cached tokens invalidated",
                cached_ip,
                current_ip,
            )

    self._login()

    if not self._current_ip:
        self._current_ip = self._detect_ip()

    existing_keys = self._list_keys()
    valid, stale, unmanaged = _filter_managed_keys(
        existing_keys, self._current_ip, self.key_name
    )

    logger.info(
        "Key audit: %d valid, %d stale (wrong IP), %d unmanaged",
        len(valid),
        len(stale),
        len(unmanaged),
    )

    # Revoke stale keys (wrong IP)
    for key in stale:
        self._revoke_key(key["id"])

    # Collect tokens from valid keys
    tokens = [k["key"] for k in valid]

    # Determine how many new keys to create
    keys_needed = self.key_count - len(tokens)
    total_after_revoke = len(unmanaged) + len(valid)
    available_slots = _MAX_KEYS_PER_ACCOUNT - total_after_revoke

    if keys_needed > 0:
        keys_to_create = min(keys_needed, available_slots)

        if keys_to_create < keys_needed:
            logger.warning(
                "Account has %d keys (max %d). Can only create %d of %d needed. "
                "Delete some keys at https://developer.clashofclans.com "
                "or lower key_count.",
                total_after_revoke,
                _MAX_KEYS_PER_ACCOUNT,
                keys_to_create,
                keys_needed,
            )

        for _i in range(keys_to_create):
            suffix = f" #{len(tokens) + 1}" if self.key_count > 1 else ""
            new_key = self._create_key(
                name=self.key_name,
                description=f"{self.key_description}{suffix}",
            )
            tokens.append(new_key["key"])

    if not tokens:
        raise KeyManagerError(
            "No usable API keys available. The account may have reached "
            "the 10-key limit with keys from other applications. "
            "Please delete unused keys at https://developer.clashofclans.com"
        )

    # Save to local cache if persistence is enabled
    if self.persist_keys:
        _save_cached_keys(self._cache_path, self.key_name, tokens, self._current_ip)

    logger.info("Key manager ready with %d token(s)", len(tokens))
    return tokens

refresh_keys

refresh_keys() -> list[str]

Re-detect IP and re-initialize keys.

Call this when the API returns accessDenied.invalidIp.

Source code in cocapi/key_manager.py
533
534
535
536
537
538
539
540
541
542
543
544
def refresh_keys(self) -> list[str]:
    """
    Re-detect IP and re-initialize keys.

    Call this when the API returns accessDenied.invalidIp.
    """
    logger.info("Refreshing keys due to IP change...")
    if self.persist_keys:
        _invalidate_cached_keys(self._cache_path, self.key_name)
    self._current_ip = None
    self._logged_in = False
    return self.manage_keys()

MetricsTracker

MetricsTracker(max_metrics: int = 1000)

Tracks API request metrics and statistics

Initialize metrics tracker

PARAMETER DESCRIPTION
max_metrics

Maximum number of metrics to store (oldest are removed)

TYPE: int DEFAULT: 1000

Source code in cocapi/metrics.py
16
17
18
19
20
21
22
23
24
25
def __init__(self, max_metrics: int = 1000):
    """
    Initialize metrics tracker

    Args:
        max_metrics: Maximum number of metrics to store (oldest are removed)
    """
    self.max_metrics = max_metrics
    self.metrics: list[RequestMetric] = []
    self._enabled = False

enable

enable() -> None

Enable metrics tracking

Source code in cocapi/metrics.py
27
28
29
def enable(self) -> None:
    """Enable metrics tracking"""
    self._enabled = True

disable

disable() -> None

Disable metrics tracking

Source code in cocapi/metrics.py
31
32
33
def disable(self) -> None:
    """Disable metrics tracking"""
    self._enabled = False

is_enabled

is_enabled() -> bool

Check if metrics tracking is enabled

Source code in cocapi/metrics.py
35
36
37
def is_enabled(self) -> bool:
    """Check if metrics tracking is enabled"""
    return self._enabled

record_request

record_request(
    endpoint: str,
    method: str,
    status_code: int,
    response_time: float,
    cache_hit: bool,
    error_type: str | None = None,
) -> None

Record metrics for a request if metrics are enabled

Source code in cocapi/metrics.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def record_request(
    self,
    endpoint: str,
    method: str,
    status_code: int,
    response_time: float,
    cache_hit: bool,
    error_type: str | None = None,
) -> None:
    """Record metrics for a request if metrics are enabled"""
    if not self._enabled:
        return

    # Format endpoint for better grouping
    formatted_endpoint = format_endpoint_for_metrics(endpoint)

    metric = RequestMetric(
        endpoint=formatted_endpoint,
        method=method,
        status_code=status_code,
        response_time=response_time,
        timestamp=time.time(),
        cache_hit=cache_hit,
        error_type=error_type,
    )

    self.metrics.append(metric)

    # Keep only the most recent metrics
    if len(self.metrics) > self.max_metrics:
        self.metrics = self.metrics[-self.max_metrics :]

get_metrics_summary

get_metrics_summary() -> dict[str, Any]

Get comprehensive metrics summary

Source code in cocapi/metrics.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def get_metrics_summary(self) -> dict[str, Any]:
    """Get comprehensive metrics summary"""
    if not self.metrics:
        return {"total_requests": 0, "message": "No metrics available"}

    total_requests = len(self.metrics)

    # Calculate success rate
    successful_requests = sum(1 for m in self.metrics if 200 <= m.status_code < 300)
    success_rate = (successful_requests / total_requests) * 100

    # Calculate cache hit rate
    cache_hits = sum(1 for m in self.metrics if m.cache_hit)
    cache_hit_rate = (cache_hits / total_requests) * 100

    # Calculate average response time
    avg_response_time = sum(m.response_time for m in self.metrics) / total_requests

    # Most used endpoints
    endpoint_counts: dict[str, int] = defaultdict(int)
    for metric in self.metrics:
        endpoint_counts[metric.endpoint] += 1

    most_used_endpoints = sorted(
        endpoint_counts.items(), key=lambda x: x[1], reverse=True
    )[:5]

    # Error breakdown
    error_counts: dict[str, int] = defaultdict(int)
    for metric in self.metrics:
        if metric.error_type:
            error_counts[metric.error_type] += 1

    # Status code breakdown
    status_counts: dict[int, int] = defaultdict(int)
    for metric in self.metrics:
        status_counts[metric.status_code] += 1

    # Response time percentiles
    response_times = sorted(m.response_time for m in self.metrics)
    percentiles = self._calculate_percentiles(response_times)

    return {
        "total_requests": total_requests,
        "success_rate": round(success_rate, 2),
        "cache_hit_rate": round(cache_hit_rate, 2),
        "average_response_time": round(avg_response_time, 3),
        "response_time_percentiles": percentiles,
        "most_used_endpoints": most_used_endpoints,
        "status_code_breakdown": dict(status_counts),
        "error_breakdown": dict(error_counts),
        "timespan": {
            "start": min(m.timestamp for m in self.metrics),
            "end": max(m.timestamp for m in self.metrics),
            "duration_seconds": max(m.timestamp for m in self.metrics)
            - min(m.timestamp for m in self.metrics),
        },
    }

get_endpoint_metrics

get_endpoint_metrics(endpoint: str) -> dict[str, Any]

Get metrics for a specific endpoint

Source code in cocapi/metrics.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def get_endpoint_metrics(self, endpoint: str) -> dict[str, Any]:
    """Get metrics for a specific endpoint"""
    formatted_endpoint = format_endpoint_for_metrics(endpoint)
    endpoint_metrics = [m for m in self.metrics if m.endpoint == formatted_endpoint]

    if not endpoint_metrics:
        return {
            "endpoint": endpoint,
            "total_requests": 0,
            "message": "No metrics available for this endpoint",
        }

    total_requests = len(endpoint_metrics)
    successful_requests = sum(
        1 for m in endpoint_metrics if 200 <= m.status_code < 300
    )
    success_rate = (successful_requests / total_requests) * 100

    cache_hits = sum(1 for m in endpoint_metrics if m.cache_hit)
    cache_hit_rate = (cache_hits / total_requests) * 100

    avg_response_time = (
        sum(m.response_time for m in endpoint_metrics) / total_requests
    )

    response_times = sorted(m.response_time for m in endpoint_metrics)
    percentiles = self._calculate_percentiles(response_times)

    return {
        "endpoint": endpoint,
        "formatted_endpoint": formatted_endpoint,
        "total_requests": total_requests,
        "success_rate": round(success_rate, 2),
        "cache_hit_rate": round(cache_hit_rate, 2),
        "average_response_time": round(avg_response_time, 3),
        "response_time_percentiles": percentiles,
    }

get_recent_errors

get_recent_errors(limit: int = 10) -> list[dict[str, Any]]

Get recent error requests

Source code in cocapi/metrics.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def get_recent_errors(self, limit: int = 10) -> list[dict[str, Any]]:
    """Get recent error requests"""
    error_metrics = [
        m for m in self.metrics if m.status_code >= 400 or m.error_type
    ]

    # Sort by timestamp, most recent first
    error_metrics.sort(key=lambda x: x.timestamp, reverse=True)

    return [
        {
            "endpoint": m.endpoint,
            "method": m.method,
            "status_code": m.status_code,
            "error_type": m.error_type,
            "response_time": m.response_time,
            "timestamp": m.timestamp,
        }
        for m in error_metrics[:limit]
    ]

clear_metrics

clear_metrics() -> None

Clear all stored metrics

Source code in cocapi/metrics.py
189
190
191
def clear_metrics(self) -> None:
    """Clear all stored metrics"""
    self.metrics.clear()

export_metrics_csv

export_metrics_csv() -> str

Export metrics as CSV string

Source code in cocapi/metrics.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def export_metrics_csv(self) -> str:
    """Export metrics as CSV string"""
    if not self.metrics:
        return "No metrics to export"

    lines = [
        "endpoint,method,status_code,response_time,timestamp,cache_hit,error_type"
    ]

    for metric in self.metrics:
        lines.append(
            f"{metric.endpoint},{metric.method},{metric.status_code},"
            f"{metric.response_time},{metric.timestamp},{metric.cache_hit},"
            f"{metric.error_type or ''}"
        )

    return "\n".join(lines)

get_performance_insights

get_performance_insights() -> dict[str, Any]

Get performance insights and recommendations

Source code in cocapi/metrics.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def get_performance_insights(self) -> dict[str, Any]:
    """Get performance insights and recommendations"""
    if not self.metrics:
        return {"message": "No metrics available for insights"}

    insights = []
    summary = self.get_metrics_summary()

    # Check cache hit rate
    if summary["cache_hit_rate"] < 30:
        insights.append(
            {
                "type": "performance",
                "message": f"Low cache hit rate ({summary['cache_hit_rate']}%). Consider increasing cache TTL or reviewing caching strategy.",
                "severity": "medium",
            }
        )
    elif summary["cache_hit_rate"] > 80:
        insights.append(
            {
                "type": "performance",
                "message": f"Excellent cache hit rate ({summary['cache_hit_rate']}%)!",
                "severity": "info",
            }
        )

    # Check success rate
    if summary["success_rate"] < 95:
        insights.append(
            {
                "type": "reliability",
                "message": f"Success rate is {summary['success_rate']}%. Review error patterns.",
                "severity": "high" if summary["success_rate"] < 90 else "medium",
            }
        )

    # Check average response time
    if summary["average_response_time"] > 2.0:
        insights.append(
            {
                "type": "performance",
                "message": f"High average response time ({summary['average_response_time']}s). Consider optimizing requests.",
                "severity": "medium",
            }
        )

    # Check for frequent errors
    error_breakdown = summary.get("error_breakdown", {})
    if error_breakdown:
        most_common_error = max(error_breakdown.items(), key=lambda x: x[1])
        if (
            most_common_error[1] > len(self.metrics) * 0.1
        ):  # More than 10% of requests
            insights.append(
                {
                    "type": "reliability",
                    "message": f"Frequent {most_common_error[0]} errors ({most_common_error[1]} occurrences). Investigate root cause.",
                    "severity": "high",
                }
            )

    return {
        "insights": insights,
        "recommendations": self._generate_recommendations(insights),
    }

MiddlewareManager

MiddlewareManager()

Manages request and response middleware

Source code in cocapi/middleware.py
13
14
15
16
17
18
19
20
def __init__(self) -> None:
    self.request_middleware: list[
        Callable[
            [str, dict[str, str], dict[str, Any]],
            tuple[str, dict[str, str], dict[str, Any]],
        ]
    ] = []
    self.response_middleware: list[Callable[[dict[str, Any]], dict[str, Any]]] = []

add_request_middleware

add_request_middleware(
    middleware: Callable[
        [str, dict[str, str], dict[str, Any]],
        tuple[str, dict[str, str], dict[str, Any]],
    ],
) -> None

Add middleware function to process requests before they're sent.

PARAMETER DESCRIPTION
middleware

Function that takes (url, headers, params) and returns modified versions

TYPE: Callable[[str, dict[str, str], dict[str, Any]], tuple[str, dict[str, str], dict[str, Any]]]

Examples:

def add_custom_header(url, headers, params): headers = headers.copy() headers['X-Custom'] = 'MyApp' return url, headers, params

manager.add_request_middleware(add_custom_header)

Source code in cocapi/middleware.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def add_request_middleware(
    self,
    middleware: Callable[
        [str, dict[str, str], dict[str, Any]],
        tuple[str, dict[str, str], dict[str, Any]],
    ],
) -> None:
    """
    Add middleware function to process requests before they're sent.

    Args:
        middleware: Function that takes (url, headers, params) and returns modified versions

    Examples:
        def add_custom_header(url, headers, params):
            headers = headers.copy()
            headers['X-Custom'] = 'MyApp'
            return url, headers, params

        manager.add_request_middleware(add_custom_header)
    """
    self.request_middleware.append(middleware)

add_response_middleware

add_response_middleware(
    middleware: Callable[[dict[str, Any]], dict[str, Any]],
) -> None

Add middleware function to process responses after they're received.

PARAMETER DESCRIPTION
middleware

Function that takes response dict and returns modified version

TYPE: Callable[[dict[str, Any]], dict[str, Any]]

Examples:

def add_timestamp(response): response['_processed_at'] = time.time() return response

manager.add_response_middleware(add_timestamp)

Source code in cocapi/middleware.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def add_response_middleware(
    self, middleware: Callable[[dict[str, Any]], dict[str, Any]]
) -> None:
    """
    Add middleware function to process responses after they're received.

    Args:
        middleware: Function that takes response dict and returns modified version

    Examples:
        def add_timestamp(response):
            response['_processed_at'] = time.time()
            return response

        manager.add_response_middleware(add_timestamp)
    """
    self.response_middleware.append(middleware)

apply_request_middleware

apply_request_middleware(
    url: str,
    headers: dict[str, str],
    params: dict[str, Any],
) -> tuple[str, dict[str, str], dict[str, Any]]

Apply all request middleware in order

Source code in cocapi/middleware.py
63
64
65
66
67
68
69
70
71
72
def apply_request_middleware(
    self, url: str, headers: dict[str, str], params: dict[str, Any]
) -> tuple[str, dict[str, str], dict[str, Any]]:
    """Apply all request middleware in order"""
    for middleware in self.request_middleware:
        try:
            url, headers, params = middleware(url, headers, params)
        except Exception as e:
            logging.warning(f"Request middleware failed: {e}")
    return url, headers, params

apply_response_middleware

apply_response_middleware(
    response: dict[str, Any],
) -> dict[str, Any]

Apply all response middleware in order

Source code in cocapi/middleware.py
74
75
76
77
78
79
80
81
def apply_response_middleware(self, response: dict[str, Any]) -> dict[str, Any]:
    """Apply all response middleware in order"""
    for middleware in self.response_middleware:
        try:
            response = middleware(response)
        except Exception as e:
            logging.warning(f"Response middleware failed: {e}")
    return response

clear_request_middleware

clear_request_middleware() -> None

Clear all request middleware

Source code in cocapi/middleware.py
83
84
85
def clear_request_middleware(self) -> None:
    """Clear all request middleware"""
    self.request_middleware.clear()

clear_response_middleware

clear_response_middleware() -> None

Clear all response middleware

Source code in cocapi/middleware.py
87
88
89
def clear_response_middleware(self) -> None:
    """Clear all response middleware"""
    self.response_middleware.clear()

clear_all_middleware

clear_all_middleware() -> None

Clear all middleware

Source code in cocapi/middleware.py
91
92
93
94
def clear_all_middleware(self) -> None:
    """Clear all middleware"""
    self.clear_request_middleware()
    self.clear_response_middleware()

get_middleware_info

get_middleware_info() -> dict[str, Any]

Get information about registered middleware

Source code in cocapi/middleware.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def get_middleware_info(self) -> dict[str, Any]:
    """Get information about registered middleware"""
    return {
        "request_middleware_count": len(self.request_middleware),
        "response_middleware_count": len(self.response_middleware),
        "request_middleware_names": [
            getattr(mw, "__name__", "anonymous") for mw in self.request_middleware
        ],
        "response_middleware_names": [
            getattr(mw, "__name__", "anonymous") for mw in self.response_middleware
        ],
    }

Achievement

Bases: BaseModel

A player achievement with progress and star count.

BadgeUrls

Bases: BaseModel

Clan badge image URLs in multiple sizes.

BuilderBaseLeague

Bases: BaseModel

Builder Base league ranking.

CapitalLeague

Bases: BaseModel

Clan Capital league ranking.

ChatLanguage

Bases: BaseModel

A clan's configured chat language.

Clan

Bases: BaseModel

Full clan profile from the /clans/{tag} endpoint.

ClanBuilderBaseRankingEntry

Bases: BaseModel

A clan's entry in the Builder Base rankings leaderboard.

ClanCapital

Bases: BaseModel

Clan Capital data including hall level and districts.

ClanCapitalRaidSeason

Bases: BaseModel

A Clan Capital raid season with attack/defense logs and rewards.

ClanCapitalRankingEntry

Bases: BaseModel

A clan's entry in the Clan Capital rankings leaderboard.

ClanMember

Bases: BaseModel

A member entry within a clan's member list.

ClanRankingEntry

Bases: BaseModel

A clan's entry in the trophy rankings leaderboard.

ClanSearchEntry

Bases: BaseModel

Clan entry returned from the search endpoint (no memberList).

ClanWar

Bases: BaseModel

Current or past clan war from the /clans/{tag}/currentwar endpoint.

ClanWarLeagueClan

Bases: BaseModel

A clan participating in a Clan War League group.

ClanWarLeagueGroup

Bases: BaseModel

Full Clan War League group with clans and rounds.

ClanWarLeagueMember

Bases: BaseModel

A member in a Clan War League group.

ClanWarLeagueRound

Bases: BaseModel

A round within a Clan War League season containing war tags.

ClanWarLogEntry

Bases: BaseModel

A single entry in a clan's war log.

GoldPassSeason

Bases: BaseModel

Current Gold Pass season start and end times.

Hero

Bases: BaseModel

A hero unit (e.g. Barbarian King, Archer Queen).

HeroEquipment

Bases: BaseModel

Equipment item that can be assigned to a hero.

IconUrls

Bases: BaseModel

Icon image URLs for leagues, labels, and other entities.

Label

Bases: BaseModel

A label that can be assigned to clans or players.

League

Bases: BaseModel

A player's trophy league (e.g. Legend League, Titan I).

LeagueSeason

Bases: BaseModel

A league season identifier (e.g. 2026-02).

LeagueTier

Bases: BaseModel

A tier within a league (sub-division).

Location

Bases: BaseModel

A geographic location (country or region) used for rankings.

Paging

Bases: BaseModel

Paging metadata returned with paginated list responses.

Player

Bases: BaseModel

Full player profile from the /players/{tag} endpoint.

PlayerBuilderBaseRankingEntry

Bases: BaseModel

A player's entry in the Builder Base rankings leaderboard.

PlayerClan

Bases: BaseModel

Compact clan info embedded in player responses.

PlayerHouse

Bases: BaseModel

Player house decoration data.

PlayerRankingEntry

Bases: BaseModel

A player's entry in the trophy rankings leaderboard.

Spell

Bases: BaseModel

A spell in a player's spell factory.

Troop

Bases: BaseModel

A troop in a player's army (home or builder village).

VerifyTokenResponse

Bases: BaseModel

Response from the player token verification endpoint.

WarAttack

Bases: BaseModel

A single attack in a clan war.

WarClan

Bases: BaseModel

A clan's war data including members and attack summary.

WarLeague

Bases: BaseModel

Clan War League tier (e.g. Champion I, Master II).

WarMember

Bases: BaseModel

A clan member participating in a war.

add_debug_logging_middleware

add_debug_logging_middleware() -> Callable[
    [str, dict[str, str], dict[str, Any]],
    tuple[str, dict[str, str], dict[str, Any]],
]

Create middleware that logs request details

Source code in cocapi/middleware.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def add_debug_logging_middleware() -> Callable[
    [str, dict[str, str], dict[str, Any]], tuple[str, dict[str, str], dict[str, Any]]
]:
    """Create middleware that logs request details"""

    def middleware(
        url: str, headers: dict[str, str], params: dict[str, Any]
    ) -> tuple[str, dict[str, str], dict[str, Any]]:
        logging.debug(f"API Request: {url}")
        logging.debug(f"Headers: {headers}")
        logging.debug(f"Params: {params}")
        return url, headers, params

    middleware.__name__ = "debug_logging"
    return middleware

add_request_id_middleware

add_request_id_middleware() -> Callable[
    [str, dict[str, str], dict[str, Any]],
    tuple[str, dict[str, str], dict[str, Any]],
]

Create middleware that adds unique request ID to headers

Source code in cocapi/middleware.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def add_request_id_middleware() -> Callable[
    [str, dict[str, str], dict[str, Any]], tuple[str, dict[str, str], dict[str, Any]]
]:
    """Create middleware that adds unique request ID to headers"""
    import uuid

    def middleware(
        url: str, headers: dict[str, str], params: dict[str, Any]
    ) -> tuple[str, dict[str, str], dict[str, Any]]:
        headers = headers.copy()
        headers["X-Request-ID"] = str(uuid.uuid4())
        return url, headers, params

    middleware.__name__ = "request_id"
    return middleware

add_response_size_middleware

add_response_size_middleware() -> Callable[
    [dict[str, Any]], dict[str, Any]
]

Create middleware that adds response size information

Source code in cocapi/middleware.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def add_response_size_middleware() -> Callable[[dict[str, Any]], dict[str, Any]]:
    """Create middleware that adds response size information"""
    import json

    def middleware(response: dict[str, Any]) -> dict[str, Any]:
        if isinstance(response, dict):
            response = response.copy()
            try:
                response_json = json.dumps(response)
                response["_response_size_bytes"] = len(response_json.encode("utf-8"))
            except Exception:
                # If serialization fails, estimate size
                response["_response_size_bytes"] = len(str(response))
        return response

    middleware.__name__ = "response_size"
    return middleware

add_response_timestamp_middleware

add_response_timestamp_middleware() -> Callable[
    [dict[str, Any]], dict[str, Any]
]

Create middleware that adds processing timestamp to response

Source code in cocapi/middleware.py
163
164
165
166
167
168
169
170
171
172
173
174
def add_response_timestamp_middleware() -> Callable[[dict[str, Any]], dict[str, Any]]:
    """Create middleware that adds processing timestamp to response"""
    import time

    def middleware(response: dict[str, Any]) -> dict[str, Any]:
        if isinstance(response, dict):
            response = response.copy()
            response["_processed_at"] = time.time()
        return response

    middleware.__name__ = "response_timestamp"
    return middleware

add_user_agent_middleware

add_user_agent_middleware(
    user_agent: str,
) -> Callable[
    [str, dict[str, str], dict[str, Any]],
    tuple[str, dict[str, str], dict[str, Any]],
]

Create middleware that adds custom User-Agent header

Source code in cocapi/middleware.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def add_user_agent_middleware(
    user_agent: str,
) -> Callable[
    [str, dict[str, str], dict[str, Any]], tuple[str, dict[str, str], dict[str, Any]]
]:
    """Create middleware that adds custom User-Agent header"""

    def middleware(
        url: str, headers: dict[str, str], params: dict[str, Any]
    ) -> tuple[str, dict[str, str], dict[str, Any]]:
        headers = headers.copy()
        headers["User-Agent"] = user_agent
        return url, headers, params

    middleware.__name__ = f"user_agent_{user_agent.replace(' ', '_')}"
    return middleware

create_dynamic_model

create_dynamic_model(
    response: dict[str, Any], endpoint_type: str
) -> dict[str, Any] | Any

Create a Pydantic model from JSON response.

First tries to match a concrete model from the schema registry. Falls back to dynamic model generation for unknown endpoints.

PARAMETER DESCRIPTION
response

The JSON response data

TYPE: dict[str, Any]

endpoint_type

Type of endpoint (clan, player, etc.)

TYPE: str

RETURNS DESCRIPTION
dict[str, Any] | Any

Either a Pydantic model instance or the original dict

Source code in cocapi/models.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def create_dynamic_model(
    response: dict[str, Any], endpoint_type: str
) -> dict[str, Any] | Any:
    """
    Create a Pydantic model from JSON response.

    First tries to match a concrete model from the schema registry.
    Falls back to dynamic model generation for unknown endpoints.

    Args:
        response: The JSON response data
        endpoint_type: Type of endpoint (clan, player, etc.)

    Returns:
        Either a Pydantic model instance or the original dict
    """
    if not PYDANTIC_AVAILABLE:
        logging.warning("Pydantic not available - returning dict response")
        return response

    if response.get("result") == "error":
        return response

    # 1. Try concrete model from schema registry
    try:
        from .model_registry import resolve_response

        resolved = resolve_response(response, endpoint_type)
        if resolved is not None:
            return resolved
    except ImportError:
        pass  # schemas not installed, fall through to dynamic

    # 2. Fallback to dynamic model generation
    try:
        model_name = _generate_model_name(endpoint_type)
        field_definitions = _analyze_response_structure(response)
        DynamicModel = create_model(model_name, **field_definitions)
        return DynamicModel(**response)

    except Exception as e:
        logging.warning(f"Failed to create dynamic model: {e}")
        return response

get_pydantic_info

get_pydantic_info() -> dict[str, Any]

Get information about Pydantic availability and version

Source code in cocapi/models.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def get_pydantic_info() -> dict[str, Any]:
    """Get information about Pydantic availability and version"""
    if not PYDANTIC_AVAILABLE:
        return {
            "available": False,
            "version": None,
            "message": "Pydantic not installed. Install with: pip install 'cocapi[pydantic]'",
        }

    try:
        import pydantic

        return {
            "available": True,
            "version": pydantic.VERSION,
            "message": "Pydantic available for dynamic model generation",
        }
    except Exception as e:
        return {
            "available": False,
            "version": None,
            "message": f"Pydantic import error: {e}",
        }

validate_pydantic_available

validate_pydantic_available() -> bool

Check if Pydantic is available for dynamic model creation

Source code in cocapi/models.py
122
123
124
def validate_pydantic_available() -> bool:
    """Check if Pydantic is available for dynamic model creation"""
    return PYDANTIC_AVAILABLE

build_url

build_url(
    base_url: str,
    endpoint: str,
    params: dict[str, Any] | None = None,
) -> str

Build a complete URL with parameters

Source code in cocapi/utils.py
14
15
16
17
18
19
20
21
22
23
24
def build_url(
    base_url: str, endpoint: str, params: dict[str, Any] | None = None
) -> str:
    """Build a complete URL with parameters"""
    if params:
        # Filter out None values and empty parameters
        filtered_params = {k: v for k, v in params.items() if v is not None and v != ""}
        if filtered_params:
            query_string = urllib.parse.urlencode(filtered_params)
            return f"{base_url}{endpoint}?{query_string}"
    return f"{base_url}{endpoint}"

clean_tag

clean_tag(tag: str) -> str

Remove # prefix from clan/player tags if present

Source code in cocapi/utils.py
 9
10
11
def clean_tag(tag: str) -> str:
    """Remove # prefix from clan/player tags if present"""
    return tag[1:] if tag.startswith("#") else tag

validate_params

validate_params(
    params: dict[str, Any] | None,
    valid_params: tuple[str, ...],
) -> bool

Validate that all parameters are in the allowed list

Source code in cocapi/utils.py
27
28
29
30
31
32
33
def validate_params(
    params: dict[str, Any] | None, valid_params: tuple[str, ...]
) -> bool:
    """Validate that all parameters are in the allowed list"""
    if not params:
        return True
    return all(param in valid_params for param in params.keys())