Skip to content

Performance Optimization

Lazy Loading Strategies

from yapfm import YAPFileManager
from typing import Any, Dict, Optional
import threading
import time

class LazyFileManager:
    """File manager with lazy loading and caching."""

    def __init__(self, path: str, cache_ttl: int = 300):
        self.path = path
        self.cache_ttl = cache_ttl
        self._cache: Optional[Dict[str, Any]] = None
        self._cache_timestamp: Optional[float] = None
        self._lock = threading.RLock()
        self._fm = YAPFileManager(path, auto_create=True)

    def _is_cache_valid(self) -> bool:
        """Check if cache is still valid."""
        if self._cache is None or self._cache_timestamp is None:
            return False

        return time.time() - self._cache_timestamp < self.cache_ttl

    def _load_data(self) -> Dict[str, Any]:
        """Load data from file."""
        with self._fm:
            return self._fm.data.copy()

    def get_data(self) -> Dict[str, Any]:
        """Get data with lazy loading."""
        with self._lock:
            if not self._is_cache_valid():
                self._cache = self._load_data()
                self._cache_timestamp = time.time()

            return self._cache.copy()

    def get_key(self, dot_key: str, default: Any = None) -> Any:
        """Get a key with lazy loading."""
        data = self.get_data()

        # Navigate through nested keys
        keys = dot_key.split('.')
        value = data

        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                return default

        return value

    def invalidate_cache(self) -> None:
        """Invalidate the cache."""
        with self._lock:
            self._cache = None
            self._cache_timestamp = None

# Usage
lazy_fm = LazyFileManager("config.json", cache_ttl=60)  # 1 minute cache

# First access loads from file
data = lazy_fm.get_data()

# Subsequent accesses use cache
key_value = lazy_fm.get_key("database.host")

Memory-Efficient Processing

from yapfm import YAPFileManager
from typing import Iterator, Dict, Any
import json

class StreamingFileManager:
    """File manager for processing large files in chunks."""

    def __init__(self, path: str, chunk_size: int = 1000):
        self.path = path
        self.chunk_size = chunk_size
        self._fm = YAPFileManager(path, auto_create=True)

    def process_large_data(self, processor: callable) -> None:
        """Process large data in chunks."""
        with self._fm:
            data = self._fm.data

            # Process data in chunks
            items = list(data.items())
            for i in range(0, len(items), self.chunk_size):
                chunk = dict(items[i:i + self.chunk_size])
                processor(chunk)

    def stream_keys(self) -> Iterator[tuple]:
        """Stream keys and values for memory-efficient processing."""
        with self._fm:
            data = self._fm.data

            def _stream_dict(d: Dict[str, Any], prefix: str = "") -> Iterator[tuple]:
                for key, value in d.items():
                    full_key = f"{prefix}.{key}" if prefix else key

                    if isinstance(value, dict):
                        yield from _stream_dict(value, full_key)
                    else:
                        yield (full_key, value)

            yield from _stream_dict(data)

    def batch_update(self, updates: Dict[str, Any]) -> None:
        """Update multiple keys efficiently."""
        with self._fm:
            # Get current data
            current_data = self._fm.data.copy()

            # Apply updates
            for key, value in updates.items():
                keys = key.split('.')
                target = current_data

                # Navigate to target
                for k in keys[:-1]:
                    if k not in target:
                        target[k] = {}
                    target = target[k]

                # Set value
                target[keys[-1]] = value

            # Save updated data
            self._fm.data = current_data

# Usage
streaming_fm = StreamingFileManager("large_config.json")

# Process large data
def process_chunk(chunk: Dict[str, Any]) -> None:
    print(f"Processing chunk with {len(chunk)} items")

streaming_fm.process_large_data(process_chunk)

# Stream keys for memory-efficient processing
for key, value in streaming_fm.stream_keys():
    print(f"{key}: {value}")