Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions codecarbon/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import subprocess
import sys
from contextlib import contextmanager
from functools import lru_cache
from os.path import expandvars
from pathlib import Path
from typing import Optional, Union
Expand Down Expand Up @@ -73,25 +74,26 @@ def backup(file_path: Union[str, Path], ext: Optional[str] = ".bak") -> None:
file_path.rename(backup_path)


def detect_cpu_model() -> str:
@lru_cache(maxsize=1)
def detect_cpu_model() -> Optional[str]:
cpu_info = cpuinfo.get_cpu_info()
if cpu_info:
cpu_model_detected = cpu_info.get("brand_raw", "")
return cpu_model_detected
return None


def is_mac_os() -> str:
def is_mac_os() -> bool:
system = sys.platform.lower()
return system.startswith("dar")


def is_windows_os() -> str:
def is_windows_os() -> bool:
system = sys.platform.lower()
return system.startswith("win")


def is_linux_os() -> str:
def is_linux_os() -> bool:
system = sys.platform.lower()
return system.startswith("lin")

Expand Down
106 changes: 80 additions & 26 deletions codecarbon/input.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""
App configuration: This will likely change when we have a common location for data files
App configuration and static reference data loading.

Data files are static reference data that never change during runtime.
They are loaded once at module import to avoid repeated file I/O on the hot path
(start_task/stop_task calls for instance).
"""

import atexit
import json
import sys
from contextlib import ExitStack
from typing import Dict
from typing import Any, Dict

import pandas as pd

Expand All @@ -18,6 +22,49 @@
from importlib_resources import files as importlib_resources_files


_CACHE: Dict[str, Any] = {}
_MODULE_NAME = "codecarbon"


def _get_resource_path(filepath: str):
"""Get filesystem path to a package resource file."""
file_manager = ExitStack()
atexit.register(file_manager.close)
ref = importlib_resources_files(_MODULE_NAME).joinpath(filepath)
path = file_manager.enter_context(importlib_resources_as_file(ref))
return path


def _load_static_data() -> None:
"""
Load all static reference data at module import.

Called once when codecarbon is imported. All data loaded here
is immutable and shared across all tracker instances.
"""
# Global energy mix - used for emissions calculations
path = _get_resource_path("data/private_infra/global_energy_mix.json")
with open(path) as f:
_CACHE["global_energy_mix"] = json.load(f)

# Cloud emissions data
path = _get_resource_path("data/cloud/impact.csv")
_CACHE["cloud_emissions"] = pd.read_csv(path)

# Carbon intensity per source
path = _get_resource_path("data/private_infra/carbon_intensity_per_source.json")
with open(path) as f:
_CACHE["carbon_intensity_per_source"] = json.load(f)

# CPU power data
path = _get_resource_path("data/hardware/cpu_power.csv")
_CACHE["cpu_power"] = pd.read_csv(path)


# Load static data at module import
_load_static_data()


class DataSource:
def __init__(self):
self.config = {
Expand Down Expand Up @@ -84,56 +131,63 @@ def cpu_power_path(self):

def get_global_energy_mix_data(self) -> Dict:
"""
Returns Global Energy Mix Data
Returns Global Energy Mix Data.
Data is pre-loaded at module import for performance.
"""
with open(self.global_energy_mix_data_path) as f:
global_energy_mix: Dict = json.load(f)
return global_energy_mix
return _CACHE["global_energy_mix"]

def get_cloud_emissions_data(self) -> pd.DataFrame:
"""
Returns Cloud Regions Impact Data
Returns Cloud Regions Impact Data.
Data is pre-loaded at module import for performance.
"""
return pd.read_csv(self.cloud_emissions_path)
return _CACHE["cloud_emissions"]

def get_country_emissions_data(self, country_iso_code: str) -> Dict:
"""
Returns Emissions Across Regions in a country
Returns Emissions Across Regions in a country.
Data is cached on first access per country.

:param country_iso_code: ISO code similar to one used in file names
:return: emissions in lbs/MWh and region code
"""
try:
with open(self.country_emissions_data_path(country_iso_code)) as f:
country_emissions_data: Dict = json.load(f)
return country_emissions_data
except KeyError:
# KeyError raised from line 39, when there is no data path specified for
# the given country
raise DataSourceException
cache_key = f"country_emissions_{country_iso_code}"
if cache_key not in _CACHE:
try:
with open(self.country_emissions_data_path(country_iso_code)) as f:
_CACHE[cache_key] = json.load(f)
except KeyError:
# KeyError raised when there is no data path specified for the country
raise DataSourceException
return _CACHE[cache_key]

def get_country_energy_mix_data(self, country_iso_code: str) -> Dict:
"""
Returns Energy Mix Across Regions in a country
Returns Energy Mix Across Regions in a country.
Data is cached on first access per country.

:param country_iso_code: ISO code similar to one used in file names
:return: energy mix by region code
"""
with open(self.country_energy_mix_data_path(country_iso_code)) as f:
country_energy_mix_data: Dict = json.load(f)
return country_energy_mix_data
cache_key = f"country_energy_mix_{country_iso_code}"
if cache_key not in _CACHE:
with open(self.country_energy_mix_data_path(country_iso_code)) as f:
_CACHE[cache_key] = json.load(f)
return _CACHE[cache_key]

def get_carbon_intensity_per_source_data(self) -> Dict:
"""
Returns Carbon intensity per source. In gCO2.eq/kWh.
Data is pre-loaded at module import for performance.
"""
with open(self.carbon_intensity_per_source_path) as f:
carbon_intensity_per_source: Dict = json.load(f)
return carbon_intensity_per_source
return _CACHE["carbon_intensity_per_source"]

def get_cpu_power_data(self) -> pd.DataFrame:
"""
Returns CPU power Data
Returns CPU power Data.
Data is pre-loaded at module import for performance.
"""
return pd.read_csv(self.cpu_power_path)
return _CACHE["cpu_power"]


class DataSourceException(Exception):
Expand Down
29 changes: 28 additions & 1 deletion tests/test_core_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
import shutil
import tempfile

from codecarbon.core.util import backup, resolve_path
from codecarbon.core.util import backup, detect_cpu_model, resolve_path


def test_detect_cpu_model_caching():
"""Test that detect_cpu_model() results are cached."""
# Clear cache to ensure clean state
detect_cpu_model.cache_clear()

# First call should populate cache
result1 = detect_cpu_model()
cache_info1 = detect_cpu_model.cache_info()
assert cache_info1.hits == 0
assert cache_info1.misses == 1

# Second call should hit cache
result2 = detect_cpu_model()
cache_info2 = detect_cpu_model.cache_info()
assert cache_info2.hits == 1
assert cache_info2.misses == 1

# Results should be identical
assert result1 == result2

# Third call should also hit cache
detect_cpu_model()
cache_info3 = detect_cpu_model.cache_info()
assert cache_info3.hits == 2
assert cache_info3.misses == 1


def test_backup():
Expand Down
98 changes: 98 additions & 0 deletions tests/test_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Tests for codecarbon/input.py module-level caching.

The caching mechanism loads static reference data once at module import
to avoid file I/O on the hot path (start_task/stop_task).
"""

import unittest


class TestDataSourceCaching(unittest.TestCase):
"""Test that DataSource uses module-level cache for static data."""

def test_cache_populated_at_import(self):
"""Verify that _CACHE is populated when module is imported."""
from codecarbon.input import _CACHE

# All static data should be pre-loaded
self.assertIn("global_energy_mix", _CACHE)
self.assertIn("cloud_emissions", _CACHE)
self.assertIn("carbon_intensity_per_source", _CACHE)
self.assertIn("cpu_power", _CACHE)

# Verify data is non-empty
self.assertGreater(len(_CACHE["global_energy_mix"]), 0)
self.assertGreater(len(_CACHE["cloud_emissions"]), 0)
self.assertGreater(len(_CACHE["carbon_intensity_per_source"]), 0)
self.assertGreater(len(_CACHE["cpu_power"]), 0)

def test_get_global_energy_mix_returns_cached_data(self):
"""Verify get_global_energy_mix_data() returns cached object."""
from codecarbon.input import _CACHE, DataSource

ds = DataSource()
data = ds.get_global_energy_mix_data()

# Should return the exact same object from cache
self.assertIs(data, _CACHE["global_energy_mix"])

def test_get_cloud_emissions_returns_cached_data(self):
"""Verify get_cloud_emissions_data() returns cached object."""
from codecarbon.input import _CACHE, DataSource

ds = DataSource()
data = ds.get_cloud_emissions_data()

# Should return the exact same object from cache
self.assertIs(data, _CACHE["cloud_emissions"])

def test_get_carbon_intensity_returns_cached_data(self):
"""Verify get_carbon_intensity_per_source_data() returns cached object."""
from codecarbon.input import _CACHE, DataSource

ds = DataSource()
data = ds.get_carbon_intensity_per_source_data()

# Should return the exact same object from cache
self.assertIs(data, _CACHE["carbon_intensity_per_source"])

def test_get_cpu_power_returns_cached_data(self):
"""Verify get_cpu_power_data() returns cached object."""
from codecarbon.input import _CACHE, DataSource

ds = DataSource()
data = ds.get_cpu_power_data()

# Should return the exact same object from cache
self.assertIs(data, _CACHE["cpu_power"])

def test_country_data_lazy_loaded(self):
"""Verify country-specific data is lazy-loaded and cached."""
from codecarbon.input import _CACHE, DataSource

ds = DataSource()
cache_key = "country_emissions_usa"

# USA data may or may not be cached depending on prior test runs
# Just verify that after calling, it IS cached
data = ds.get_country_emissions_data("usa")
self.assertIn(cache_key, _CACHE)
self.assertIs(data, _CACHE[cache_key])

def test_multiple_datasource_instances_share_cache(self):
"""Verify that multiple DataSource instances share the same cache."""
from codecarbon.input import DataSource

ds1 = DataSource()
ds2 = DataSource()

# Both instances should return the same cached object
data1 = ds1.get_global_energy_mix_data()
data2 = ds2.get_global_energy_mix_data()

self.assertIs(data1, data2)


if __name__ == "__main__":
unittest.main()
Loading