Skip to content

async_client

async_client

Async functionality for cocapi

AsyncRateLimiter

AsyncRateLimiter(rate: float, burst: int)

Simple async rate limiter using token bucket algorithm

Initialize rate limiter

PARAMETER DESCRIPTION
rate

Requests per second

TYPE: float

burst

Maximum burst requests

TYPE: int

Source code in cocapi/async_client.py
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(self, rate: float, burst: int):
    """
    Initialize rate limiter

    Args:
        rate: Requests per second
        burst: Maximum burst requests
    """
    self.rate = rate
    self.burst = burst
    self.tokens = float(burst)
    self.last_update = time.time()
    self._lock = asyncio.Lock()

acquire async

acquire() -> None

Acquire permission to make a request

Source code in cocapi/async_client.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
async def acquire(self) -> None:
    """Acquire permission to make a request"""
    async with self._lock:
        now = time.time()
        time_passed = now - self.last_update
        self.tokens = min(float(self.burst), self.tokens + time_passed * self.rate)
        self.last_update = now

        if self.tokens < 1:
            sleep_time = (1 - self.tokens) / self.rate
            await asyncio.sleep(sleep_time)
            self.tokens = 0
        else:
            self.tokens -= 1

AsyncCocApiCore

AsyncCocApiCore(token: str, config: ApiConfig)

Core async functionality for CocApi

Initialize async core

PARAMETER DESCRIPTION
token

API token

TYPE: str

config

Configuration object

TYPE: ApiConfig

Source code in cocapi/async_client.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(self, token: str, config: ApiConfig):
    """
    Initialize async core

    Args:
        token: API token
        config: Configuration object
    """
    self.token = token
    self.config = config
    self.headers = {
        "authorization": f"Bearer {token}",
        "Accept": "application/json",
    }

    # Initialize components
    self.cache = CacheManager(default_ttl=config.cache_ttl)
    self.cache.enable() if config.enable_caching else self.cache.disable()

    self.metrics = MetricsTracker(max_metrics=config.metrics_window_size)
    self.metrics.enable() if config.enable_metrics else self.metrics.disable()

    self.middleware = MiddlewareManager()

    # Async-specific attributes
    self._client: httpx.AsyncClient | None = None
    self._should_close_client = False
    self._rate_limiter: AsyncRateLimiter | None = None

    # Key manager state (set via set_key_manager_state())
    self._km_email: str | None = None
    self._km_password: str | None = None
    self._km_key_name: str | None = None
    self._km_key_count: int = 1
    self._km_auto_refresh: bool = False
    self._km_persist_keys: bool = False
    self._km_key_storage_path: str | None = None

    if config.enable_rate_limiting:
        self._rate_limiter = AsyncRateLimiter(
            rate=config.requests_per_second, burst=config.burst_limit
        )

__aenter__ async

__aenter__() -> AsyncCocApiCore

Async context manager entry

Source code in cocapi/async_client.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def __aenter__(self) -> "AsyncCocApiCore":
    """Async context manager entry"""
    if self._client is None:
        self._client = httpx.AsyncClient(
            timeout=self.config.timeout,
            limits=httpx.Limits(
                max_connections=self.config.max_connections,
                max_keepalive_connections=self.config.max_keepalive_connections,
            ),
            http2=True,
        )
        self._should_close_client = True

    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit

Source code in cocapi/async_client.py
114
115
116
117
118
119
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit"""
    if self._should_close_client and self._client:
        await self._client.aclose()
        self._client = None
        self._should_close_client = False

set_key_manager_state

set_key_manager_state(
    email: str,
    password: str,
    key_name: str,
    key_count: int,
    auto_refresh: bool,
    persist_keys: bool = False,
    key_storage_path: str | None = None,
) -> None

Store key manager credentials for auto-refresh on 403.

Source code in cocapi/async_client.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def set_key_manager_state(
    self,
    email: str,
    password: str,
    key_name: str,
    key_count: int,
    auto_refresh: bool,
    persist_keys: bool = False,
    key_storage_path: str | None = None,
) -> None:
    """Store key manager credentials for auto-refresh on 403."""
    self._km_email = email
    self._km_password = password
    self._km_key_name = key_name
    self._km_key_count = key_count
    self._km_auto_refresh = auto_refresh
    self._km_persist_keys = persist_keys
    self._km_key_storage_path = key_storage_path

make_request async

make_request(
    endpoint: str,
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
    _refresh_attempted: bool = False,
) -> dict[str, Any]

Make an async API request

PARAMETER DESCRIPTION
endpoint

API endpoint path

TYPE: str

params

Request parameters

TYPE: dict[str, Any] | None DEFAULT: None

use_dynamic_model

Whether to create dynamic Pydantic models

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, Any]

API response as dictionary

Source code in cocapi/async_client.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
async def make_request(
    self,
    endpoint: str,
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
    _refresh_attempted: bool = False,
) -> dict[str, Any]:
    """
    Make an async API request

    Args:
        endpoint: API endpoint path
        params: Request parameters
        use_dynamic_model: Whether to create dynamic Pydantic models

    Returns:
        API response as dictionary
    """
    start_time = time.time()
    cache_hit = False

    # Build URL
    url = build_url(self.config.base_url, endpoint, params)

    # Check cache first
    if self.config.enable_caching:
        cached_response = self.cache.get(url, params)
        if cached_response is not None:
            cache_hit = True

            # Record metrics for cache hit
            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="GET",
                    status_code=200,
                    response_time=time.time() - start_time,
                    cache_hit=True,
                )

            # Apply dynamic model if requested
            if use_dynamic_model:
                cached_response = create_dynamic_model(cached_response, endpoint)

            return cached_response

    # Apply rate limiting
    if self._rate_limiter:
        await self._rate_limiter.acquire()

    # Apply request middleware
    headers = self.headers.copy()
    url, headers, params = self.middleware.apply_request_middleware(
        url, headers, params or {}
    )

    # Make the request with retries
    for attempt in range(self.config.max_retries):
        try:
            if not self._client:
                raise RuntimeError(
                    "AsyncCocApiCore not initialized. Use 'async with' context manager."
                )

            response = await self._client.get(url, headers=headers)
            response_time = time.time() - start_time

            if is_successful_response(response.status_code):
                # Parse JSON response
                try:
                    json_response = response.json()
                except Exception as e:
                    error_response = self._handle_json_error(e, attempt)
                    if self.config.enable_metrics:
                        self.metrics.record_request(
                            endpoint=endpoint,
                            method="GET",
                            status_code=response.status_code,
                            response_time=response_time,
                            cache_hit=False,
                            error_type="json",
                        )
                    return error_response

                # Apply response middleware
                json_response = self.middleware.apply_response_middleware(
                    json_response
                )

                # Cache successful response
                if self.config.enable_caching:
                    self.cache.set(url, params, json_response)

                # Record metrics
                if self.config.enable_metrics:
                    self.metrics.record_request(
                        endpoint=endpoint,
                        method="GET",
                        status_code=response.status_code,
                        response_time=response_time,
                        cache_hit=cache_hit,
                    )

                # Apply dynamic model if requested
                if use_dynamic_model:
                    json_response = create_dynamic_model(json_response, endpoint)

                return json_response

            else:
                # Auto-refresh on accessDenied.invalidIp (once only)
                if (
                    response.status_code == 403
                    and not _refresh_attempted
                    and self._should_auto_refresh_keys()
                ):
                    try:
                        body = response.json()
                        if body.get("reason") == "accessDenied.invalidIp":
                            if await self._refresh_token():
                                return await self.make_request(
                                    endpoint,
                                    params,
                                    use_dynamic_model,
                                    _refresh_attempted=True,
                                )
                    except Exception:
                        # Auto-refresh is best-effort; fall through to normal error handling
                        pass

                # Handle HTTP errors
                error_response = self._handle_http_error(
                    response.status_code, attempt
                )

                if self.config.enable_metrics:
                    self.metrics.record_request(
                        endpoint=endpoint,
                        method="GET",
                        status_code=response.status_code,
                        response_time=response_time,
                        cache_hit=False,
                        error_type="http",
                    )

                if (
                    not should_retry_error(response.status_code)
                    or attempt >= self.config.max_retries - 1
                ):
                    return error_response

                # Wait before retry
                await asyncio.sleep(self.config.retry_delay * (2**attempt))

        except httpx.TimeoutException as e:
            error_response = self._handle_network_error(e, attempt)
            response_time = time.time() - start_time

            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="GET",
                    status_code=0,
                    response_time=response_time,
                    cache_hit=False,
                    error_type="timeout",
                )

            if attempt >= self.config.max_retries - 1:
                return error_response

            await asyncio.sleep(self.config.retry_delay * (2**attempt))

        except Exception as e:
            error_response = self._handle_network_error(e, attempt)
            response_time = time.time() - start_time

            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="GET",
                    status_code=0,
                    response_time=response_time,
                    cache_hit=False,
                    error_type="connection",
                )

            if attempt >= self.config.max_retries - 1:
                return error_response

            await asyncio.sleep(self.config.retry_delay * (2**attempt))

    # This should never be reached, but just in case
    return {
        "result": "error",
        "message": "Max retries exceeded",
        "error_type": "retry_exhausted",
    }

make_post_request async

make_post_request(
    endpoint: str,
    json_body: dict[str, Any],
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
    _refresh_attempted: bool = False,
) -> dict[str, Any]

Make an async POST API request

PARAMETER DESCRIPTION
endpoint

API endpoint path

TYPE: str

json_body

JSON body to send with the POST request

TYPE: dict[str, Any]

params

Optional query parameters

TYPE: dict[str, Any] | None DEFAULT: None

use_dynamic_model

Whether to create dynamic Pydantic models

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, Any]

API response as dictionary

Source code in cocapi/async_client.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
async def make_post_request(
    self,
    endpoint: str,
    json_body: dict[str, Any],
    params: dict[str, Any] | None = None,
    use_dynamic_model: bool = False,
    _refresh_attempted: bool = False,
) -> dict[str, Any]:
    """
    Make an async POST API request

    Args:
        endpoint: API endpoint path
        json_body: JSON body to send with the POST request
        params: Optional query parameters
        use_dynamic_model: Whether to create dynamic Pydantic models

    Returns:
        API response as dictionary
    """
    start_time = time.time()

    # Apply rate limiting
    if self._rate_limiter:
        await self._rate_limiter.acquire()

    # Apply request middleware before building URL so middleware can modify params
    headers = self.headers.copy()
    url_base = build_url(self.config.base_url, endpoint, None)
    url_base, headers, params = self.middleware.apply_request_middleware(
        url_base, headers, params or {}
    )
    # Rebuild URL with (potentially modified) params
    url = build_url(self.config.base_url, endpoint, params or None)

    # Make the request with retries
    for attempt in range(self.config.max_retries):
        try:
            if not self._client:
                raise RuntimeError(
                    "AsyncCocApiCore not initialized. Use 'async with' context manager."
                )

            response = await self._client.post(url, headers=headers, json=json_body)
            response_time = time.time() - start_time

            if is_successful_response(response.status_code):
                try:
                    json_response = response.json()
                except Exception as e:
                    error_response = self._handle_json_error(e, attempt)
                    if self.config.enable_metrics:
                        self.metrics.record_request(
                            endpoint=endpoint,
                            method="POST",
                            status_code=response.status_code,
                            response_time=response_time,
                            cache_hit=False,
                            error_type="json",
                        )
                    return error_response

                # Apply response middleware
                json_response = self.middleware.apply_response_middleware(
                    json_response
                )

                # Record metrics
                if self.config.enable_metrics:
                    self.metrics.record_request(
                        endpoint=endpoint,
                        method="POST",
                        status_code=response.status_code,
                        response_time=response_time,
                        cache_hit=False,
                    )

                # Apply dynamic model if requested
                if use_dynamic_model:
                    json_response = create_dynamic_model(json_response, endpoint)

                return json_response

            else:
                # Auto-refresh on accessDenied.invalidIp (once only)
                if (
                    response.status_code == 403
                    and not _refresh_attempted
                    and self._should_auto_refresh_keys()
                ):
                    try:
                        body = response.json()
                        if body.get("reason") == "accessDenied.invalidIp":
                            if await self._refresh_token():
                                return await self.make_post_request(
                                    endpoint,
                                    json_body,
                                    params,
                                    use_dynamic_model,
                                    _refresh_attempted=True,
                                )
                    except Exception:
                        # Auto-refresh is best-effort; fall through to normal error handling
                        pass

                error_response = self._handle_http_error(
                    response.status_code, attempt
                )

                if self.config.enable_metrics:
                    self.metrics.record_request(
                        endpoint=endpoint,
                        method="POST",
                        status_code=response.status_code,
                        response_time=response_time,
                        cache_hit=False,
                        error_type="http",
                    )

                if (
                    not should_retry_error(response.status_code)
                    or attempt >= self.config.max_retries - 1
                ):
                    return error_response

                await asyncio.sleep(self.config.retry_delay * (2**attempt))

        except httpx.TimeoutException as e:
            error_response = self._handle_network_error(e, attempt)
            response_time = time.time() - start_time

            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="POST",
                    status_code=0,
                    response_time=response_time,
                    cache_hit=False,
                    error_type="timeout",
                )

            if attempt >= self.config.max_retries - 1:
                return error_response

            await asyncio.sleep(self.config.retry_delay * (2**attempt))

        except Exception as e:
            error_response = self._handle_network_error(e, attempt)
            response_time = time.time() - start_time

            if self.config.enable_metrics:
                self.metrics.record_request(
                    endpoint=endpoint,
                    method="POST",
                    status_code=0,
                    response_time=response_time,
                    cache_hit=False,
                    error_type="connection",
                )

            if attempt >= self.config.max_retries - 1:
                return error_response

            await asyncio.sleep(self.config.retry_delay * (2**attempt))

    return {
        "result": "error",
        "message": "Max retries exceeded",
        "error_type": "retry_exhausted",
    }

test_connection async

test_connection() -> dict[str, Any]

Test API connection

Source code in cocapi/async_client.py
599
600
601
602
603
604
605
606
607
608
609
610
611
async def test_connection(self) -> dict[str, Any]:
    """Test API connection"""
    try:
        response = await self.make_request("/locations")
        if response.get("result") == "error":
            return response
        return {"result": "success", "message": "API connection successful"}
    except Exception as e:
        return {
            "result": "error",
            "message": f"Connection test failed: {str(e)}",
            "error_type": "connection",
        }