diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml deleted file mode 100755 index da2b046..0000000 --- a/.github/workflows/tests.yml +++ /dev/null @@ -1,41 +0,0 @@ -name: Run Tests - -on: - push: - branches: ["**"] - - -jobs: - test: - name: Test Python ${{ matrix.python-version }} - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ["3.12", "3.13"] - steps: - - uses: actions/checkout@v4 - - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Install Poetry - uses: snok/install-poetry@v1 - with: - virtualenvs-create: true - virtualenvs-in-project: true - - - name: Install dependencies - run: poetry install --with test --no-interaction --no-ansi - - - name: Run tests with coverage - run: poetry run poe tests - - - name: Upload coverage reports to Codecov - if: matrix.python-version == '3.13' - uses: codecov/codecov-action@v5 - with: - token: ${{ secrets.CODECOV_TOKEN }} - slug: ddc/pythonLogs diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 5576340..ebfd27f 100755 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -1,20 +1,93 @@ name: CI/CD Pipeline -on: + +"on": push: - branches: [main, master] + branches: ["**"] tags: ['v*'] jobs: + test: + name: Test Python ${{ matrix.python-version }} on ${{ matrix.os }} + runs-on: ${{ matrix.runs-on || matrix.os }} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest", "macos-latest", "windows-latest"] + python-version: ["3.12", "3.13"] + include: + - os: "macos-14-arm64" + runs-on: "macos-14" + python-version: "3.12" + - os: "macos-14-arm64" + runs-on: "macos-14" + python-version: "3.13" + - os: "ubuntu-latest-arm64" + runs-on: "ubuntu-latest" + python-version: "3.12" + arch: "arm64" + - os: "ubuntu-latest-arm64" + runs-on: "ubuntu-latest" + python-version: "3.13" + arch: "arm64" + steps: + - uses: actions/checkout@v4 + + - name: Set up QEMU for ARM64 emulation + if: matrix.arch == 'arm64' + uses: docker/setup-qemu-action@v3 + with: + platforms: arm64 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + virtualenvs-create: true + virtualenvs-in-project: true + + - name: Install dependencies + run: poetry install --with test --no-interaction --no-ansi + shell: bash + + - name: Run tests with coverage + run: poetry run poe tests + shell: bash + + - name: Upload coverage to Codecov + if: matrix.python-version == '3.13' && matrix.os == 'ubuntu-latest' + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + slug: ddc/pythonLogs + + - name: Upload test results to Codecov + if: matrix.python-version == '3.13' && matrix.os == 'ubuntu-latest' + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + slug: ddc/pythonLogs + build: name: Build for Python ${{ matrix.python-version }} on ${{ matrix.os }} - runs-on: ${{ matrix.os }} + runs-on: ${{ matrix.runs-on || matrix.os }} if: startsWith(github.ref, 'refs/tags/v') strategy: matrix: - os: [ubuntu-latest, windows-latest, macos-latest] + os: ["ubuntu-latest", "macos-latest", "windows-latest"] python-version: ["3.12", "3.13"] + include: + - os: "macos-14-arm64" + runs-on: "macos-14" + python-version: "3.12" + - os: "macos-14-arm64" + runs-on: "macos-14" + python-version: "3.13" steps: - uses: actions/checkout@v4 @@ -30,7 +103,12 @@ jobs: virtualenvs-in-project: true - name: Install build dependencies only - run: poetry install --only main --no-interaction --no-ansi + run: | + if [[ "${{ matrix.os }}" == "windows-latest" ]]; then + poetry install --only main --no-interaction --no-ansi -E mongodb -E mssql -E mysql -E oracle + else + poetry install --only main --no-interaction --no-ansi -E all + fi shell: bash - name: Build package with custom build script @@ -53,10 +131,79 @@ jobs: path: dist-py${{ matrix.python-version }}-${{ matrix.os }}/ retention-days: 7 + build-linux-arm64: + name: Build Linux ARM64 wheels + runs-on: ubuntu-latest + if: startsWith(github.ref, 'refs/tags/v') + strategy: + matrix: + python-version: ["3.12", "3.13"] + steps: + - uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + with: + platforms: linux/arm64 + + - name: Build ARM64 wheel using Docker + run: | + # Create a multi-stage Dockerfile for ARM64 builds + cat << 'EOF' > Dockerfile.arm64 + FROM python:${{ matrix.python-version }}-slim + + WORKDIR /build + + # Install build dependencies + RUN apt-get update && apt-get install -y \ + build-essential \ + git \ + && rm -rf /var/lib/apt/lists/* + + # Install Poetry + RUN pip install poetry + + # Copy project files + COPY . . + + # Configure Poetry and build + RUN poetry config virtualenvs.create false \ + && poetry install --only main --no-interaction --no-ansi \ + && poetry run python build.py \ + && poetry build + + # Copy artifacts to output volume + CMD ["cp", "-r", "dist/", "/output/"] + EOF + + # Build using buildx for ARM64 + docker buildx build \ + --platform linux/arm64 \ + --file Dockerfile.arm64 \ + --tag pythonlogs-arm64-builder:${{ matrix.python-version }} \ + --load \ + . + + # Create output directory + mkdir -p dist-arm64-py${{ matrix.python-version }} + + # Run container to extract artifacts + docker run --rm \ + --platform linux/arm64 \ + -v $(pwd)/dist-arm64-py${{ matrix.python-version }}:/output \ + pythonlogs-arm64-builder:${{ matrix.python-version }} + + - name: Upload Linux ARM64 Python ${{ matrix.python-version }} artifacts + uses: actions/upload-artifact@v4 + with: + name: python-packages-${{ matrix.python-version }}-linux-arm64 + path: dist-arm64-py${{ matrix.python-version }}/ + retention-days: 7 + release: name: Create Release runs-on: ubuntu-latest - needs: build + needs: [build, build-linux-arm64] if: startsWith(github.ref, 'refs/tags/v') permissions: contents: write diff --git a/.gitignore b/.gitignore index 85df18b..836de51 100755 --- a/.gitignore +++ b/.gitignore @@ -162,3 +162,4 @@ cython_debug/ /profile_fixed.prof /profile_output.prof /profile_pytest.prof +/junit.xml diff --git a/poetry.lock b/poetry.lock index 0123c0f..31c73b1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -27,79 +27,100 @@ files = [ [[package]] name = "coverage" -version = "7.9.2" +version = "7.10.0" description = "Code coverage measurement for Python" optional = false python-versions = ">=3.9" groups = ["test"] files = [ - {file = "coverage-7.9.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:66283a192a14a3854b2e7f3418d7db05cdf411012ab7ff5db98ff3b181e1f912"}, - {file = "coverage-7.9.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4e01d138540ef34fcf35c1aa24d06c3de2a4cffa349e29a10056544f35cca15f"}, - {file = "coverage-7.9.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f22627c1fe2745ee98d3ab87679ca73a97e75ca75eb5faee48660d060875465f"}, - {file = "coverage-7.9.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4b1c2d8363247b46bd51f393f86c94096e64a1cf6906803fa8d5a9d03784bdbf"}, - {file = "coverage-7.9.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c10c882b114faf82dbd33e876d0cbd5e1d1ebc0d2a74ceef642c6152f3f4d547"}, - {file = "coverage-7.9.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:de3c0378bdf7066c3988d66cd5232d161e933b87103b014ab1b0b4676098fa45"}, - {file = "coverage-7.9.2-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:1e2f097eae0e5991e7623958a24ced3282676c93c013dde41399ff63e230fcf2"}, - {file = "coverage-7.9.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:28dc1f67e83a14e7079b6cea4d314bc8b24d1aed42d3582ff89c0295f09b181e"}, - {file = "coverage-7.9.2-cp310-cp310-win32.whl", hash = "sha256:bf7d773da6af9e10dbddacbf4e5cab13d06d0ed93561d44dae0188a42c65be7e"}, - {file = "coverage-7.9.2-cp310-cp310-win_amd64.whl", hash = "sha256:0c0378ba787681ab1897f7c89b415bd56b0b2d9a47e5a3d8dc0ea55aac118d6c"}, - {file = "coverage-7.9.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a7a56a2964a9687b6aba5b5ced6971af308ef6f79a91043c05dd4ee3ebc3e9ba"}, - {file = "coverage-7.9.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:123d589f32c11d9be7fe2e66d823a236fe759b0096f5db3fb1b75b2fa414a4fa"}, - {file = "coverage-7.9.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:333b2e0ca576a7dbd66e85ab402e35c03b0b22f525eed82681c4b866e2e2653a"}, - {file = "coverage-7.9.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:326802760da234baf9f2f85a39e4a4b5861b94f6c8d95251f699e4f73b1835dc"}, - {file = "coverage-7.9.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:19e7be4cfec248df38ce40968c95d3952fbffd57b400d4b9bb580f28179556d2"}, - {file = "coverage-7.9.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:0b4a4cb73b9f2b891c1788711408ef9707666501ba23684387277ededab1097c"}, - {file = "coverage-7.9.2-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:2c8937fa16c8c9fbbd9f118588756e7bcdc7e16a470766a9aef912dd3f117dbd"}, - {file = "coverage-7.9.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:42da2280c4d30c57a9b578bafd1d4494fa6c056d4c419d9689e66d775539be74"}, - {file = "coverage-7.9.2-cp311-cp311-win32.whl", hash = "sha256:14fa8d3da147f5fdf9d298cacc18791818f3f1a9f542c8958b80c228320e90c6"}, - {file = "coverage-7.9.2-cp311-cp311-win_amd64.whl", hash = "sha256:549cab4892fc82004f9739963163fd3aac7a7b0df430669b75b86d293d2df2a7"}, - {file = "coverage-7.9.2-cp311-cp311-win_arm64.whl", hash = "sha256:c2667a2b913e307f06aa4e5677f01a9746cd08e4b35e14ebcde6420a9ebb4c62"}, - {file = "coverage-7.9.2-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:ae9eb07f1cfacd9cfe8eaee6f4ff4b8a289a668c39c165cd0c8548484920ffc0"}, - {file = "coverage-7.9.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:9ce85551f9a1119f02adc46d3014b5ee3f765deac166acf20dbb851ceb79b6f3"}, - {file = "coverage-7.9.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f8f6389ac977c5fb322e0e38885fbbf901743f79d47f50db706e7644dcdcb6e1"}, - {file = "coverage-7.9.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ff0d9eae8cdfcd58fe7893b88993723583a6ce4dfbfd9f29e001922544f95615"}, - {file = "coverage-7.9.2-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fae939811e14e53ed8a9818dad51d434a41ee09df9305663735f2e2d2d7d959b"}, - {file = "coverage-7.9.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:31991156251ec202c798501e0a42bbdf2169dcb0f137b1f5c0f4267f3fc68ef9"}, - {file = "coverage-7.9.2-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:d0d67963f9cbfc7c7f96d4ac74ed60ecbebd2ea6eeb51887af0f8dce205e545f"}, - {file = "coverage-7.9.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:49b752a2858b10580969ec6af6f090a9a440a64a301ac1528d7ca5f7ed497f4d"}, - {file = "coverage-7.9.2-cp312-cp312-win32.whl", hash = "sha256:88d7598b8ee130f32f8a43198ee02edd16d7f77692fa056cb779616bbea1b355"}, - {file = "coverage-7.9.2-cp312-cp312-win_amd64.whl", hash = "sha256:9dfb070f830739ee49d7c83e4941cc767e503e4394fdecb3b54bfdac1d7662c0"}, - {file = "coverage-7.9.2-cp312-cp312-win_arm64.whl", hash = "sha256:4e2c058aef613e79df00e86b6d42a641c877211384ce5bd07585ed7ba71ab31b"}, - {file = "coverage-7.9.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:985abe7f242e0d7bba228ab01070fde1d6c8fa12f142e43debe9ed1dde686038"}, - {file = "coverage-7.9.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:82c3939264a76d44fde7f213924021ed31f55ef28111a19649fec90c0f109e6d"}, - {file = "coverage-7.9.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ae5d563e970dbe04382f736ec214ef48103d1b875967c89d83c6e3f21706d5b3"}, - {file = "coverage-7.9.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bdd612e59baed2a93c8843c9a7cb902260f181370f1d772f4842987535071d14"}, - {file = "coverage-7.9.2-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:256ea87cb2a1ed992bcdfc349d8042dcea1b80436f4ddf6e246d6bee4b5d73b6"}, - {file = "coverage-7.9.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:f44ae036b63c8ea432f610534a2668b0c3aee810e7037ab9d8ff6883de480f5b"}, - {file = "coverage-7.9.2-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:82d76ad87c932935417a19b10cfe7abb15fd3f923cfe47dbdaa74ef4e503752d"}, - {file = "coverage-7.9.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:619317bb86de4193debc712b9e59d5cffd91dc1d178627ab2a77b9870deb2868"}, - {file = "coverage-7.9.2-cp313-cp313-win32.whl", hash = "sha256:0a07757de9feb1dfafd16ab651e0f628fd7ce551604d1bf23e47e1ddca93f08a"}, - {file = "coverage-7.9.2-cp313-cp313-win_amd64.whl", hash = "sha256:115db3d1f4d3f35f5bb021e270edd85011934ff97c8797216b62f461dd69374b"}, - {file = "coverage-7.9.2-cp313-cp313-win_arm64.whl", hash = "sha256:48f82f889c80af8b2a7bb6e158d95a3fbec6a3453a1004d04e4f3b5945a02694"}, - {file = "coverage-7.9.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:55a28954545f9d2f96870b40f6c3386a59ba8ed50caf2d949676dac3ecab99f5"}, - {file = "coverage-7.9.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:cdef6504637731a63c133bb2e6f0f0214e2748495ec15fe42d1e219d1b133f0b"}, - {file = "coverage-7.9.2-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bcd5ebe66c7a97273d5d2ddd4ad0ed2e706b39630ed4b53e713d360626c3dbb3"}, - {file = "coverage-7.9.2-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9303aed20872d7a3c9cb39c5d2b9bdbe44e3a9a1aecb52920f7e7495410dfab8"}, - {file = "coverage-7.9.2-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc18ea9e417a04d1920a9a76fe9ebd2f43ca505b81994598482f938d5c315f46"}, - {file = "coverage-7.9.2-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:6406cff19880aaaadc932152242523e892faff224da29e241ce2fca329866584"}, - {file = "coverage-7.9.2-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:2d0d4f6ecdf37fcc19c88fec3e2277d5dee740fb51ffdd69b9579b8c31e4232e"}, - {file = "coverage-7.9.2-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:c33624f50cf8de418ab2b4d6ca9eda96dc45b2c4231336bac91454520e8d1fac"}, - {file = "coverage-7.9.2-cp313-cp313t-win32.whl", hash = "sha256:1df6b76e737c6a92210eebcb2390af59a141f9e9430210595251fbaf02d46926"}, - {file = "coverage-7.9.2-cp313-cp313t-win_amd64.whl", hash = "sha256:f5fd54310b92741ebe00d9c0d1d7b2b27463952c022da6d47c175d246a98d1bd"}, - {file = "coverage-7.9.2-cp313-cp313t-win_arm64.whl", hash = "sha256:c48c2375287108c887ee87d13b4070a381c6537d30e8487b24ec721bf2a781cb"}, - {file = "coverage-7.9.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:ddc39510ac922a5c4c27849b739f875d3e1d9e590d1e7b64c98dadf037a16cce"}, - {file = "coverage-7.9.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a535c0c7364acd55229749c2b3e5eebf141865de3a8f697076a3291985f02d30"}, - {file = "coverage-7.9.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:df0f9ef28e0f20c767ccdccfc5ae5f83a6f4a2fbdfbcbcc8487a8a78771168c8"}, - {file = "coverage-7.9.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2f3da12e0ccbcb348969221d29441ac714bbddc4d74e13923d3d5a7a0bebef7a"}, - {file = "coverage-7.9.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0a17eaf46f56ae0f870f14a3cbc2e4632fe3771eab7f687eda1ee59b73d09fe4"}, - {file = "coverage-7.9.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:669135a9d25df55d1ed56a11bf555f37c922cf08d80799d4f65d77d7d6123fcf"}, - {file = "coverage-7.9.2-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:9d3a700304d01a627df9db4322dc082a0ce1e8fc74ac238e2af39ced4c083193"}, - {file = "coverage-7.9.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:71ae8b53855644a0b1579d4041304ddc9995c7b21c8a1f16753c4d8903b4dfed"}, - {file = "coverage-7.9.2-cp39-cp39-win32.whl", hash = "sha256:dd7a57b33b5cf27acb491e890720af45db05589a80c1ffc798462a765be6d4d7"}, - {file = "coverage-7.9.2-cp39-cp39-win_amd64.whl", hash = "sha256:f65bb452e579d5540c8b37ec105dd54d8b9307b07bcaa186818c104ffda22441"}, - {file = "coverage-7.9.2-pp39.pp310.pp311-none-any.whl", hash = "sha256:8a1166db2fb62473285bcb092f586e081e92656c7dfa8e9f62b4d39d7e6b5050"}, - {file = "coverage-7.9.2-py3-none-any.whl", hash = "sha256:e425cd5b00f6fc0ed7cdbd766c70be8baab4b7839e4d4fe5fac48581dd968ea4"}, - {file = "coverage-7.9.2.tar.gz", hash = "sha256:997024fa51e3290264ffd7492ec97d0690293ccd2b45a6cd7d82d945a4a80c8b"}, + {file = "coverage-7.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cbd823f7ea5286c26406ad9e54268544d82f3d1cadb6d4f3b85e9877f0cab1ef"}, + {file = "coverage-7.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ab3f7a5dbaab937df0b9e9e8ec6eab235ba9a6f29d71fd3b24335affaed886cc"}, + {file = "coverage-7.10.0-cp310-cp310-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:8c63aaf850523d8cbe3f5f1a5c78f689b223797bef902635f2493ab43498f36c"}, + {file = "coverage-7.10.0-cp310-cp310-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:4c3133ce3fa84023f7c6921c4dca711be0b658784c5a51a797168229eae26172"}, + {file = "coverage-7.10.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3747d1d0af85b17d3a156cd30e4bbacf893815e846dc6c07050e9769da2b138e"}, + {file = "coverage-7.10.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:241923b350437f6a7cb343d9df72998305ef940c3c40009f06e05029a047677c"}, + {file = "coverage-7.10.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:13e82e499309307104d58ac66f9eed237f7aaceab4325416645be34064d9a2be"}, + {file = "coverage-7.10.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:bf73cdde4f6c9cd4457b00bf1696236796ac3a241f859a55e0f84a4c58326a7f"}, + {file = "coverage-7.10.0-cp310-cp310-win32.whl", hash = "sha256:2396e13275b37870a3345f58bce8b15a7e0a985771d13a4b16ce9129954e07d6"}, + {file = "coverage-7.10.0-cp310-cp310-win_amd64.whl", hash = "sha256:9d45c7c71fb3d2da92ab893602e3f28f2d1560cec765a27e1824a6e0f7e92cfd"}, + {file = "coverage-7.10.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4abc01843581a6f9dd72d4d15761861190973a2305416639435ef509288f7a04"}, + {file = "coverage-7.10.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a2093297773111d7d748fe4a99b68747e57994531fb5c57bbe439af17c11c169"}, + {file = "coverage-7.10.0-cp311-cp311-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:58240e27815bf105bd975c2fd42e700839f93d5aad034ef976411193ca32dbfd"}, + {file = "coverage-7.10.0-cp311-cp311-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:d019eac999b40ad48521ea057958b07a9f549c0c6d257a20e5c7c4ba91af8d1c"}, + {file = "coverage-7.10.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:35e0a1f5454bc80faf4ceab10d1d48f025f92046c9c0f3bec2e1a9dda55137f8"}, + {file = "coverage-7.10.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:a93dd7759c416dd1cc754123b926d065055cb9a33b6699e64a1e5bdfae1ff459"}, + {file = "coverage-7.10.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:7b3d737266048368a6ffd68f1ecd662c54de56535c82eb8f98a55ac216a72cbd"}, + {file = "coverage-7.10.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:93227c2707cb0effd9163cd0d8f0d9ab628982f7a3e915d6d64c7107867b9a07"}, + {file = "coverage-7.10.0-cp311-cp311-win32.whl", hash = "sha256:69270af3014ab3058ad6108c6d0e218166f568b5a7a070dc3d62c0a63aca1c4d"}, + {file = "coverage-7.10.0-cp311-cp311-win_amd64.whl", hash = "sha256:43c16bbb661a7b4dafac0ab69e44d6dbcc6a64c4d93aefd89edc6f8911b6ab4a"}, + {file = "coverage-7.10.0-cp311-cp311-win_arm64.whl", hash = "sha256:14e7c23fcb74ed808efb4eb48fcd25a759f0e20f685f83266d1df174860e4733"}, + {file = "coverage-7.10.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:a2adcfdaf3b4d69b0c64ad024fe9dd6996782b52790fb6033d90f36f39e287df"}, + {file = "coverage-7.10.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2d7b27c2c0840e8eeff3f1963782bd9d3bc767488d2e67a31de18d724327f9f6"}, + {file = "coverage-7.10.0-cp312-cp312-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:0ed50429786e935517570b08576a661fd79032e6060985ab492b9d39ba8e66ee"}, + {file = "coverage-7.10.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:7171c139ab6571d70460ecf788b1dcaf376bfc75a42e1946b8c031d062bbbad4"}, + {file = "coverage-7.10.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4a726aac7e6e406e403cdee4c443a13aed3ea3d67d856414c5beacac2e70c04e"}, + {file = "coverage-7.10.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2886257481a14e953e96861a00c0fe7151117a523f0470a51e392f00640bba03"}, + {file = "coverage-7.10.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:536578b79521e59c385a2e0a14a5dc2a8edd58761a966d79368413e339fc9535"}, + {file = "coverage-7.10.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:77fae95558f7804a9ceefabf3c38ad41af1da92b39781b87197c6440dcaaa967"}, + {file = "coverage-7.10.0-cp312-cp312-win32.whl", hash = "sha256:97803e14736493eb029558e1502fe507bd6a08af277a5c8eeccf05c3e970cb84"}, + {file = "coverage-7.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:4c73ab554e54ffd38d114d6bc4a7115fb0c840cf6d8622211bee3da26e4bd25d"}, + {file = "coverage-7.10.0-cp312-cp312-win_arm64.whl", hash = "sha256:3ae95d5a9aedab853641026b71b2ddd01983a0a7e9bf870a20ef3c8f5d904699"}, + {file = "coverage-7.10.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:d883fee92b9245c0120fa25b5d36de71ccd4cfc29735906a448271e935d8d86d"}, + {file = "coverage-7.10.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:c87e59e88268d30e33d3665ede4fbb77b513981a2df0059e7c106ca3de537586"}, + {file = "coverage-7.10.0-cp313-cp313-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:f669d969f669a11d6ceee0b733e491d9a50573eb92a71ffab13b15f3aa2665d4"}, + {file = "coverage-7.10.0-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:9582bd6c6771300a847d328c1c4204e751dbc339a9e249eecdc48cada41f72e6"}, + {file = "coverage-7.10.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:91f97e9637dc7977842776fdb7ad142075d6fa40bc1b91cb73685265e0d31d32"}, + {file = "coverage-7.10.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ae4fa92b6601a62367c6c9967ad32ad4e28a89af54b6bb37d740946b0e0534dd"}, + {file = "coverage-7.10.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:3a5cc8b97473e7b3623dd17a42d2194a2b49de8afecf8d7d03c8987237a9552c"}, + {file = "coverage-7.10.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:dc1cbb7f623250e047c32bd7aa1bb62ebc62608d5004d74df095e1059141ac88"}, + {file = "coverage-7.10.0-cp313-cp313-win32.whl", hash = "sha256:1380cc5666d778e77f1587cd88cc317158111f44d54c0dd3975f0936993284e0"}, + {file = "coverage-7.10.0-cp313-cp313-win_amd64.whl", hash = "sha256:bf03cf176af098ee578b754a03add4690b82bdfe070adfb5d192d0b1cd15cf82"}, + {file = "coverage-7.10.0-cp313-cp313-win_arm64.whl", hash = "sha256:8041c78cd145088116db2329b2fb6e89dc338116c962fbe654b7e9f5d72ab957"}, + {file = "coverage-7.10.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:37cc2c06052771f48651160c080a86431884db9cd62ba622cab71049b90a95b3"}, + {file = "coverage-7.10.0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:91f37270b16178b05fa107d85713d29bf21606e37b652d38646eef5f2dfbd458"}, + {file = "coverage-7.10.0-cp313-cp313t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:f9b0b0168864d09bcb9a3837548f75121645c4cfd0efce0eb994c221955c5b10"}, + {file = "coverage-7.10.0-cp313-cp313t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:df0be435d3b616e7d3ee3f9ebbc0d784a213986fe5dff9c6f1042ee7cfd30157"}, + {file = "coverage-7.10.0-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:35e9aba1c4434b837b1d567a533feba5ce205e8e91179c97974b28a14c23d3a0"}, + {file = "coverage-7.10.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:a0b0c481e74dfad631bdc2c883e57d8b058e5c90ba8ef087600995daf7bbec18"}, + {file = "coverage-7.10.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:8aec1b7c8922808a433c13cd44ace6fceac0609f4587773f6c8217a06102674b"}, + {file = "coverage-7.10.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:04ec59ceb3a594af0927f2e0d810e1221212abd9a2e6b5b917769ff48760b460"}, + {file = "coverage-7.10.0-cp313-cp313t-win32.whl", hash = "sha256:b6871e62d29646eb9b3f5f92def59e7575daea1587db21f99e2b19561187abda"}, + {file = "coverage-7.10.0-cp313-cp313t-win_amd64.whl", hash = "sha256:ff99cff2be44f78920b76803f782e91ffb46ccc7fa89eccccc0da3ca94285b64"}, + {file = "coverage-7.10.0-cp313-cp313t-win_arm64.whl", hash = "sha256:3246b63501348fe47299d12c47a27cfc221cfbffa1c2d857bcc8151323a4ae4f"}, + {file = "coverage-7.10.0-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:1f628d91f941a375b4503cb486148dbeeffb48e17bc080e0f0adfee729361574"}, + {file = "coverage-7.10.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:3a0e101d5af952d233557e445f42ebace20b06b4ceb615581595ced5386caa78"}, + {file = "coverage-7.10.0-cp314-cp314-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:ec4c1abbcc53f9f650acb14ea71725d88246a9e14ed42f8dd1b4e1b694e9d842"}, + {file = "coverage-7.10.0-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:9c95f3a7f041b4cc68a8e3fecfa6366170c13ac773841049f1cd19c8650094e0"}, + {file = "coverage-7.10.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8a2cd597b69c16d24e310611f2ed6fcfb8f09429316038c03a57e7b4f5345244"}, + {file = "coverage-7.10.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:5e18591906a40c2b3609196c9879136aa4a47c5405052ca6b065ab10cb0b71d0"}, + {file = "coverage-7.10.0-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:485c55744252ed3f300cc1a0f5f365e684a0f2651a7aed301f7a67125906b80e"}, + {file = "coverage-7.10.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:4dabea1516e5b0e9577282b149c8015e4dceeb606da66fb8d9d75932d5799bf5"}, + {file = "coverage-7.10.0-cp314-cp314-win32.whl", hash = "sha256:ac455f0537af22333fdc23b824cff81110dff2d47300bb2490f947b7c9a16017"}, + {file = "coverage-7.10.0-cp314-cp314-win_amd64.whl", hash = "sha256:b3c94b532f52f95f36fbfde3e178510a4d04eea640b484b2fe8f1491338dc653"}, + {file = "coverage-7.10.0-cp314-cp314-win_arm64.whl", hash = "sha256:2f807f2c3a9da99c80dfa73f09ef5fc3bd21e70c73ba1c538f23396a3a772252"}, + {file = "coverage-7.10.0-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:0a889ef25215990f65073c32cadf37483363a6a22914186dedc15a6b1a597d50"}, + {file = "coverage-7.10.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:39c638ecf3123805bacbf71aff8091e93af490c676fca10ab4e442375076e483"}, + {file = "coverage-7.10.0-cp314-cp314t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:f2f2c0df0cbcf7dffa14f88a99c530cdef3f4fcfe935fa4f95d28be2e7ebc570"}, + {file = "coverage-7.10.0-cp314-cp314t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:048d19a5d641a2296745ab59f34a27b89a08c48d6d432685f22aac0ec1ea447f"}, + {file = "coverage-7.10.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1209b65d302d7a762004be37ab9396cbd8c99525ed572bdf455477e3a9449e06"}, + {file = "coverage-7.10.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:e44aa79a36a7a0aec6ea109905a4a7c28552d90f34e5941b36217ae9556657d5"}, + {file = "coverage-7.10.0-cp314-cp314t-musllinux_1_2_i686.whl", hash = "sha256:96124be864b89395770c9a14652afcddbcdafb99466f53a9281c51d1466fb741"}, + {file = "coverage-7.10.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:aad222e841f94b42bd1d6be71737fade66943853f0807cf87887c88f70883a2a"}, + {file = "coverage-7.10.0-cp314-cp314t-win32.whl", hash = "sha256:0eed5354d28caa5c8ad60e07e938f253e4b2810ea7dd56784339b6ce98b6f104"}, + {file = "coverage-7.10.0-cp314-cp314t-win_amd64.whl", hash = "sha256:3da35f9980058acb960b2644527cc3911f1e00f94d309d704b309fa984029109"}, + {file = "coverage-7.10.0-cp314-cp314t-win_arm64.whl", hash = "sha256:cb9e138dfa8a4b5c52c92a537651e2ca4f2ca48d8cb1bc01a2cbe7a5773c2426"}, + {file = "coverage-7.10.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:cf283ec9c6878826291b17442eb5c32d3d252dc77d25e082b460b2d2ea67ba3c"}, + {file = "coverage-7.10.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8a83488c9fc6fff487f2ab551f9b64c70672357b8949f0951b0cd778b3ed8165"}, + {file = "coverage-7.10.0-cp39-cp39-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:b86df3a7494d12338c11e59f210a0498d6109bbc3a4037f44de517ebb30a9c6b"}, + {file = "coverage-7.10.0-cp39-cp39-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:6de9b460809e5e4787b742e786a36ae2346a53982e2be317cdcb7a33c56412fb"}, + {file = "coverage-7.10.0-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:de5ef8a5954d63fa26a6aaa4600e48f885ce70fe495e8fce2c43aa9241fc9434"}, + {file = "coverage-7.10.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:f178fe5e96f1e057527d5d0b20ab76b8616e0410169c33716cc226118eaf2c4f"}, + {file = "coverage-7.10.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:4a38c42f0182a012fa9ec25bc6057e51114c1ba125be304f3f776d6d283cb303"}, + {file = "coverage-7.10.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:bf09beb5c1785cb36aad042455c0afab561399b74bb8cdaf6e82b7d77322df99"}, + {file = "coverage-7.10.0-cp39-cp39-win32.whl", hash = "sha256:cb8dfbb5d3016cb8d1940444c0c69b40cdc6c8bde724b07716ee5ea47b5273c6"}, + {file = "coverage-7.10.0-cp39-cp39-win_amd64.whl", hash = "sha256:58ff22653cd93d563110d1ff2aef958f5f21be9e917762f8124d0e36f80f172a"}, + {file = "coverage-7.10.0-py3-none-any.whl", hash = "sha256:310a786330bb0463775c21d68e26e79973839b66d29e065c5787122b8dd4489f"}, + {file = "coverage-7.10.0.tar.gz", hash = "sha256:2768885aef484b5dcde56262cbdfba559b770bfc46994fe9485dc3614c7a5867"}, ] [package.extras] @@ -395,6 +416,26 @@ pygments = ">=2.7.2" [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "requests", "setuptools", "xmlschema"] +[[package]] +name = "pytest-cov" +version = "6.2.1" +description = "Pytest plugin for measuring coverage." +optional = false +python-versions = ">=3.9" +groups = ["test"] +files = [ + {file = "pytest_cov-6.2.1-py3-none-any.whl", hash = "sha256:f5bc4c23f42f1cdd23c70b1dab1bbaef4fc505ba950d53e0081d0730dd7e86d5"}, + {file = "pytest_cov-6.2.1.tar.gz", hash = "sha256:25cc6cc0a5358204b8108ecedc51a9b57b34cc6b8c967cc2c01a4e00d8a67da2"}, +] + +[package.dependencies] +coverage = {version = ">=7.5", extras = ["toml"]} +pluggy = ">=1.2" +pytest = ">=6.2.5" + +[package.extras] +testing = ["fields", "hunter", "process-tests", "pytest-xdist", "virtualenv"] + [[package]] name = "python-dotenv" version = "1.1.1" @@ -503,4 +544,4 @@ typing-extensions = ">=4.12.0" [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "f5274cb99a5307fb389b434e47d7bea063341abf33c538472dbff5a8dac6d37a" +content-hash = "f909d5d0189c02dc89bb73ca0a7f670884daaa05daa3b3f051102e9302834157" diff --git a/pyproject.toml b/pyproject.toml index 7bc78d9..277543d 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pythonLogs" -version = "5.0.0" +version = "5.0.1" description = "High-performance Python logging library with file rotation and optimized caching for better performance" license = "MIT" readme = "README.md" @@ -43,16 +43,14 @@ pydantic-settings = "^2.10.1" python-dotenv = "^1.1.1" [tool.poetry.group.test.dependencies] -coverage = "^7.9.2" poethepoet = "^0.36.0" psutil = "^7.0.0" pytest = "^8.4.1" +pytest-cov = "^6.2.1" [tool.poe.tasks] -_test = "coverage run -m pytest -v" -_coverage_report = "coverage report" -_coverage_xml = "coverage xml" -tests = ["_test", "_coverage_report", "_coverage_xml"] +_test = "python -m pytest -v --cov=pythonLogs --cov-report=term --cov-report=xml --junitxml=junit.xml -o junit_family=legacy" +tests = ["_test"] test = ["tests"] [tool.black] @@ -66,6 +64,7 @@ markers = [ [tool.coverage.run] omit = [ + "build.py", "tests/*", ] diff --git a/pythonLogs/log_utils.py b/pythonLogs/log_utils.py index 6e77160..7982737 100644 --- a/pythonLogs/log_utils.py +++ b/pythonLogs/log_utils.py @@ -139,7 +139,7 @@ def is_older_than_x_days(path: str, days: int) -> bool: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path) try: - if int(days) in (0, 1): + if int(days) == 0: cutoff_time = datetime.now() else: cutoff_time = datetime.now() - timedelta(days=int(days)) @@ -153,17 +153,21 @@ def is_older_than_x_days(path: str, days: int) -> bool: # Cache stderr timezone for better performance @lru_cache(maxsize=1) -def _get_stderr_timezone(): +def get_stderr_timezone(): timezone_name = os.getenv("LOG_TIMEZONE", "UTC") if timezone_name.lower() == "localtime": return None # Use system local timezone - return ZoneInfo(timezone_name) + try: + return ZoneInfo(timezone_name) + except Exception: + # Fallback to local timezone if requested timezone is not available + return None def write_stderr(msg: str) -> None: """Write msg to stderr with optimized timezone handling""" try: - tz = _get_stderr_timezone() + tz = get_stderr_timezone() if tz is None: # Use local timezone dt = datetime.now() @@ -202,12 +206,17 @@ def get_log_path(directory: str, filename: str) -> str: @lru_cache(maxsize=32) -def _get_timezone_offset(timezone_: str) -> str: - """Cache timezone offset calculation""" +def get_timezone_offset(timezone_: str) -> str: + """Cache timezone offset calculation with fallback for missing timezone data""" if timezone_.lower() == "localtime": return time.strftime("%z") else: - return datetime.now(ZoneInfo(timezone_)).strftime("%z") + try: + return datetime.now(ZoneInfo(timezone_)).strftime("%z") + except Exception: + # Fallback to localtime if the requested timezone is not available + # This is common on Windows systems without full timezone data + return time.strftime("%z") def get_format(show_location: bool, name: str, timezone_: str) -> str: @@ -221,7 +230,7 @@ def get_format(show_location: bool, name: str, timezone_: str) -> str: if show_location: _debug_fmt = "[%(filename)s:%(funcName)s:%(lineno)d]:" - utc_offset = _get_timezone_offset(timezone_) + utc_offset = get_timezone_offset(timezone_) return f"[%(asctime)s.%(msecs)03d{utc_offset}]:[%(levelname)s]:{_logger_name}{_debug_fmt}%(message)s" @@ -235,13 +244,27 @@ def gzip_file_with_sufix(file_path: str, sufix: str) -> str | None: # Use pathlib for cleaner path operations renamed_dst = path_obj.with_name(f"{path_obj.stem}_{sufix}{path_obj.suffix}.gz") - try: - with open(file_path, "rb") as fin: - with gzip.open(renamed_dst, "wb", compresslevel=6) as fout: # Balanced compression - shutil.copyfileobj(fin, fout, length=64*1024) # type: ignore # 64KB chunks for better performance - except (OSError, IOError) as e: - write_stderr(f"Unable to gzip log file | {file_path} | {repr(e)}") - raise e + # Windows-specific retry mechanism for file locking issues + max_retries = 3 if sys.platform == "win32" else 1 + retry_delay = 0.1 # 100ms delay between retries + + for attempt in range(max_retries): + try: + with open(file_path, "rb") as fin: + with gzip.open(renamed_dst, "wb", compresslevel=6) as fout: # Balanced compression + shutil.copyfileobj(fin, fout, length=64*1024) # type: ignore # 64KB chunks for better performance + break # Success, exit retry loop + except PermissionError as e: + # Windows file locking issue - retry with delay + if attempt < max_retries - 1 and sys.platform == "win32": + time.sleep(retry_delay) + continue + # Final attempt failed or not Windows - treat as regular error + write_stderr(f"Unable to gzip log file | {file_path} | {repr(e)}") + raise e + except (OSError, IOError) as e: + write_stderr(f"Unable to gzip log file | {file_path} | {repr(e)}") + raise e try: path_obj.unlink() # Use pathlib for deletion @@ -254,13 +277,23 @@ def gzip_file_with_sufix(file_path: str, sufix: str) -> str | None: @lru_cache(maxsize=32) def get_timezone_function(time_zone: str) -> Callable: - """Get timezone function with caching for better performance""" + """Get timezone function with caching and fallback for missing timezone data""" match time_zone.lower(): case "utc": - return time.gmtime + try: + # Try to create UTC timezone to verify it's available + ZoneInfo("UTC") + return time.gmtime + except Exception: + # Fallback to localtime if UTC timezone data is missing + return time.localtime case "localtime": return time.localtime case _: - # Cache the timezone object - tz = ZoneInfo(time_zone) - return lambda *args: datetime.now(tz=tz).timetuple() + try: + # Cache the timezone object + tz = ZoneInfo(time_zone) + return lambda *args: datetime.now(tz=tz).timetuple() + except Exception: + # Fallback to localtime if the requested timezone is not available + return time.localtime diff --git a/pythonLogs/memory_utils.py b/pythonLogs/memory_utils.py index 0875b67..4c75ddb 100644 --- a/pythonLogs/memory_utils.py +++ b/pythonLogs/memory_utils.py @@ -175,12 +175,12 @@ def optimize_lru_cache_sizes() -> None: log_utils.get_timezone_function = lru_cache(maxsize=8)(log_utils.get_timezone_function.__wrapped__) # Clear and recreate timezone offset cache with smaller size - log_utils._get_timezone_offset.cache_clear() - log_utils._get_timezone_offset = lru_cache(maxsize=8)(log_utils._get_timezone_offset.__wrapped__) + log_utils.get_timezone_offset.cache_clear() + log_utils.get_timezone_offset = lru_cache(maxsize=8)(log_utils.get_timezone_offset.__wrapped__) # Clear and recreate stderr timezone cache with smaller size - log_utils._get_stderr_timezone.cache_clear() - log_utils._get_stderr_timezone = lru_cache(maxsize=4)(log_utils._get_stderr_timezone.__wrapped__) + log_utils.get_stderr_timezone.cache_clear() + log_utils.get_stderr_timezone = lru_cache(maxsize=4)(log_utils.get_stderr_timezone.__wrapped__) def force_garbage_collection() -> Dict[str, int]: diff --git a/tests/context_management/test_resource_management.py b/tests/context_management/test_resource_management.py index 453944d..113d151 100644 --- a/tests/context_management/test_resource_management.py +++ b/tests/context_management/test_resource_management.py @@ -62,6 +62,7 @@ def test_factory_registry_cleanup(self): # Verify handlers were closed and removed assert len(logger.handlers) == 0 + assert initial_handler_count > 0 # Ensure we actually had handlers to clean up assert len(LoggerFactory._logger_registry) == 0 def test_shutdown_specific_logger(self): @@ -236,6 +237,8 @@ def test_memory_usage_after_cleanup(self): # Logger should still exist due to registry assert logger_weakref() is not None + # Handlers should also still exist + assert all(ref() is not None for ref in handler_weakrefs) # Clear registry clear_logger_registry() @@ -244,7 +247,7 @@ def test_memory_usage_after_cleanup(self): gc.collect() # Logger should be garbage collected - # Note: This test might be flaky depending on Python's garbage collector + # Note: This test might be flaky depending on Python's garbage collector, # but it helps verify we're not holding unnecessary references print(f"Logger weakref after cleanup: {logger_weakref()}") diff --git a/tests/core/test_log_utils.py b/tests/core/test_log_utils.py index 21fd158..ad5abd1 100644 --- a/tests/core/test_log_utils.py +++ b/tests/core/test_log_utils.py @@ -1,12 +1,15 @@ +#!/usr/bin/env python3 # -*- encoding: utf-8 -*- +"""Utility functions and tests for log_utils module.""" import contextlib +import functools import io import logging import os -import shutil import sys import tempfile import time +from contextlib import contextmanager import pytest @@ -16,6 +19,319 @@ from pythonLogs import log_utils +# ============================================================================ +# UTILITY FUNCTIONS (formerly from test_utils.py) +# ============================================================================ + +def skip_if_no_zoneinfo_utc(): + """Skip test if zoneinfo or UTC timezone data is not available (common on Windows).""" + try: + from zoneinfo import ZoneInfo + ZoneInfo("UTC") # Test if UTC is available + except Exception: + pytest.skip("zoneinfo not available or UTC timezone data missing on this system") + + +def get_safe_timezone(): + """Get a timezone that works on all platforms.""" + try: + from zoneinfo import ZoneInfo + ZoneInfo("UTC") # Test if UTC is available + return "UTC" + except Exception: + return "localtime" # Fallback to localtime which should always work + + +def requires_zoneinfo_utc(func): + """Decorator to skip tests that require zoneinfo UTC support.""" + @functools.wraps(func) + def wrapper(*args, **kwargs): + skip_if_no_zoneinfo_utc() + return func(*args, **kwargs) + return wrapper + + +def requires_zoneinfo(timezone): + """Decorator to skip tests that require a specific timezone.""" + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + from zoneinfo import ZoneInfo + ZoneInfo(timezone) # Test if timezone is available + except Exception: + pytest.skip(f"Timezone '{timezone}' not available on this system") + return func(*args, **kwargs) + return wrapper + return decorator + + +def patch_logger_kwargs_with_safe_timezone(kwargs): + """Patch logger kwargs to use safe timezone if UTC is specified but not available.""" + if kwargs.get('timezone') == 'UTC': + try: + from zoneinfo import ZoneInfo + ZoneInfo("UTC") # Test if UTC is available + except Exception: + kwargs['timezone'] = 'localtime' # Fall back to localtime + return kwargs + + +def safe_delete_file(filepath, max_attempts=3, delay=0.1): + """ + Safely delete a file with Windows compatibility. + + On Windows, files can remain locked by processes even after being closed, + leading to PermissionError. This function tries multiple times with delays. + + Args: + filepath: Path to the file to delete + max_attempts: Maximum number of deletion attempts (default: 3) + delay: Delay between attempts in seconds (default: 0.1) + + Returns: + bool: True if file was deleted successfully, False otherwise + """ + if not os.path.exists(filepath): + return True # Already deleted + + for attempt in range(max_attempts): + try: + os.unlink(filepath) + return True + except PermissionError: + if sys.platform == "win32": + # On Windows, file might be locked - wait and retry + if attempt < max_attempts - 1: + time.sleep(delay) + continue + else: + # Last attempt failed - log and return False + print(f"Warning: Could not delete {filepath} after {max_attempts} attempts") + return False + else: + # On non-Windows systems, permission error is probably real + raise + except (OSError, IOError) as e: + # Other OS errors should be raised + raise e + + return False + + +def safe_close_and_delete_file(file_handler, filepath, max_attempts=3, delay=0.1): + """ + Safely close a file handler and delete the associated file. + + This function ensures proper closure of file handlers before attempting + deletion, which is crucial on Windows systems. + + Args: + file_handler: The file handler to close (can be None) + filepath: Path to the file to delete + max_attempts: Maximum number of deletion attempts (default: 3) + delay: Delay between attempts in seconds (default: 0.1) + + Returns: + bool: True if file was deleted successfully, False otherwise + """ + # Close the handler first if it exists + if file_handler is not None: + try: + file_handler.close() + except (OSError, AttributeError): + # The Handler might already be closed or not have a close method + pass + + # Small delay to ensure file handle is fully released + if sys.platform == "win32": + time.sleep(0.05) + + return safe_delete_file(filepath, max_attempts, delay) + + +def cleanup_logger_handlers(logger): + """ + Safely close and remove all handlers from a logger. + + This is crucial on Windows to ensure file handles are released + before attempting to delete temporary directories. + + Args: + logger: The logger whose handlers should be cleaned up + """ + if logger is None: + return + + # Get a copy of handlers to avoid modifying list while iterating + handlers = logger.handlers.copy() + + for handler in handlers: + try: + # Close the handler first + handler.close() + except (OSError, AttributeError): + # The Handler might already be closed or not have a close method + pass + finally: + # Remove the handler from the logger + try: + logger.removeHandler(handler) + except (ValueError, AttributeError): + # Handler might already be removed + pass + + # Small delay on Windows to ensure handles are fully released + if sys.platform == "win32": + time.sleep(0.05) + + +def cleanup_all_loggers(): + """ + Clean up all loggers by closing their handlers. + + This function iterates through all existing loggers and closes + their handlers to prevent file locking issues on Windows. + """ + import logging + + # Get all existing loggers + loggers_to_cleanup = [] + + # Get the root logger + root_logger = logging.getLogger() + if root_logger.handlers: + loggers_to_cleanup.append(root_logger) + + # Get all named loggers from the logger manager + for name in logging.Logger.manager.loggerDict: + logger = logging.getLogger(name) + if logger.handlers: + loggers_to_cleanup.append(logger) + + # Clean up all loggers + for logger in loggers_to_cleanup: + cleanup_logger_handlers(logger) + + # Additional delay on Windows + if sys.platform == "win32": + time.sleep(0.1) + + +def safe_delete_directory(directory_path, max_attempts=5, delay=0.2): + """ + Safely delete a directory with Windows compatibility. + + On Windows, directories can remain locked by processes even after + file handles are closed, leading to PermissionError. + + Args: + directory_path: Path to the directory to delete + max_attempts: Maximum number of deletion attempts (default: 5) + delay: Delay between attempts in seconds (default: 0.2) + + Returns: + bool: True if directory was deleted successfully, False otherwise + """ + import shutil + + if not os.path.exists(directory_path): + return True # Already deleted + + for attempt in range(max_attempts): + try: + shutil.rmtree(directory_path) + return True + except PermissionError: + if sys.platform == "win32": + # On Windows, directory might be locked - wait and retry + if attempt < max_attempts - 1: + # Clean up any remaining logger handlers + cleanup_all_loggers() + time.sleep(delay) + continue + else: + # Last attempt failed - log and return False + print(f"Warning: Could not delete directory {directory_path} after {max_attempts} attempts") + return False + else: + # On non-Windows systems, permission error is probably real + raise + except (OSError, IOError) as e: + # Other OS errors should be raised + raise e + + return False + + +@contextmanager +def windows_safe_temp_directory(**kwargs): + """ + Context manager for creating temporary directories that are safely cleaned up on Windows. + + This context manager handles Windows-specific file locking issues by ensuring + all logger handlers are cleaned up before attempting directory deletion. + + Args: + **kwargs: Arguments passed to tempfile.TemporaryDirectory + + Yields: + str: Path to the temporary directory + """ + # Clean up any existing loggers before creating temp directory + cleanup_all_loggers() + + temp_dir_obj = tempfile.TemporaryDirectory(**kwargs) + temp_dir = temp_dir_obj.__enter__() + + try: + yield temp_dir + finally: + try: + # Clean up all loggers and their handlers before directory deletion + cleanup_all_loggers() + + # Attempt normal cleanup first + temp_dir_obj.__exit__(None, None, None) + except (OSError, PermissionError): + # On Windows, if normal cleanup fails, use safe deletion + try: + safe_delete_directory(temp_dir) + except Exception: + # If all else fails, just log the issue + print(f"Warning: Could not clean up temporary directory {temp_dir}") + + +def create_windows_safe_temp_file(suffix="", prefix="tmp", dir=None, text=False): + """ + Create a temporary file with Windows-safe cleanup. + + Args: + suffix: File suffix (default: "") + prefix: File prefix (default: "tmp") + dir: Directory to create file in (default: None) + text: Whether to open in text mode (default: False) + + Returns: + tuple: (file_handle, file_path) + """ + import tempfile + + # Create temporary file + fd, filepath = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir, text=text) + + # Convert file descriptor to file handle + mode = 'w' if text else 'wb' + file_handle = os.fdopen(fd, mode) + + return file_handle, filepath + + +# ============================================================================ +# TEST CLASSES +# ============================================================================ + + class TestLogUtils: @classmethod def setup_class(cls): @@ -41,8 +357,10 @@ def test_check_filename_instance(self): assert filenames in str(exec_info.value) assert "Unable to parse filenames" in str(exec_info.value) + @pytest.mark.skipif(sys.platform == "win32", reason="Unix/Linux/macOS-specific chmod test") def test_check_directory_permissions(self): - # Test permission error on access + """Test directory permission checking (Unix/Linux/macOS).""" + # Unix-style permission testing directory = os.path.join(tempfile.gettempdir(), "test_permission") os.makedirs(directory, mode=0o000, exist_ok=True) # No permissions at all assert os.path.exists(directory) == True @@ -54,12 +372,18 @@ def test_check_directory_permissions(self): log_utils.delete_file(directory) assert os.path.exists(directory) == False - # test permission error on creation - directory = "/non-existent-directory" - with pytest.raises(PermissionError) as exec_info: - log_utils.check_directory_permissions(directory) - assert type(exec_info.value) is PermissionError - assert "Unable to create directory" in str(exec_info.value) + # test permission error on creation - use a readonly parent directory + with tempfile.TemporaryDirectory() as temp_dir: + readonly_parent = os.path.join(temp_dir, "readonly") + os.makedirs(readonly_parent, mode=0o555) # Read-only parent + try: + non_existent = os.path.join(readonly_parent, "non-existent-directory") + with pytest.raises(PermissionError) as exec_info: + log_utils.check_directory_permissions(non_existent) + assert type(exec_info.value) is PermissionError + assert "Unable to create directory" in str(exec_info.value) + finally: + os.chmod(readonly_parent, 0o755) # Restore permissions for cleanup def test_remove_old_logs(self): directory = os.path.join(tempfile.gettempdir(), "test_remove_logs") @@ -78,27 +402,38 @@ def test_remove_old_logs(self): assert os.path.exists(directory) == False def test_delete_file(self): - directory = tempfile.gettempdir() - tmpfilewrapper = tempfile.NamedTemporaryFile(dir=directory, suffix=".log") - file_path = tmpfilewrapper.name + """Test delete_file with standard Unix/Linux file handling.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as tmp_file: + file_path = tmp_file.name + tmp_file.write("test content") + assert os.path.isfile(file_path) == True log_utils.delete_file(file_path) assert os.path.isfile(file_path) == False def test_is_older_than_x_days(self): - directory = tempfile.gettempdir() - tmpfilewrapper = tempfile.NamedTemporaryFile(dir=directory, suffix=".log") - file_path = tmpfilewrapper.name - assert os.path.isfile(file_path) == True + """Test is_older_than_x_days with standard Unix/Linux file handling.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as tmp_file: + file_path = tmp_file.name + tmp_file.write("test content") - result = log_utils.is_older_than_x_days(file_path, 1) - assert result == True + try: + assert os.path.isfile(file_path) == True - result = log_utils.is_older_than_x_days(file_path, 5) - assert result == False + # When days=1, it compares against 1 day ago, so newly created file should NOT be older + result = log_utils.is_older_than_x_days(file_path, 1) + assert result == False - log_utils.delete_file(file_path) - assert os.path.isfile(file_path) == False + # When days=5, it compares against 5 days ago, so newly created file should NOT be older + result = log_utils.is_older_than_x_days(file_path, 5) + assert result == False + + log_utils.delete_file(file_path) + assert os.path.isfile(file_path) == False + finally: + # Ensure cleanup if the test fails + if os.path.exists(file_path): + os.unlink(file_path) def test_get_level(self): level = log_utils.get_level(11111111) @@ -123,6 +458,7 @@ def test_get_level(self): assert level == logging.CRITICAL def test_get_log_path(self): + import sys with tempfile.TemporaryDirectory() as temp_dir: test_file = "test.log" # Test 1: Valid directory should return the correct path @@ -135,62 +471,93 @@ def test_get_log_path(self): assert result == os.path.join(new_dir, test_file) assert os.path.exists(new_dir) # Should have been created - # Test 3: Existing but non-writable directory should raise PermissionError - readonly_dir = os.path.join(temp_dir, "readonly") - os.makedirs(readonly_dir, mode=0o555) - try: - with pytest.raises(PermissionError) as exc_info: - log_utils.get_log_path(readonly_dir, test_file) - assert "Unable to access directory" in str(exc_info.value) - finally: - os.chmod(readonly_dir, 0o755) # Cleanup permissions - os.rmdir(readonly_dir) + # Test 3: Existing but non-writable directory should raise PermissionError + # This test only works on Unix/Linux/macOS systems with chmod + if sys.platform != "win32": + readonly_dir = os.path.join(temp_dir, "readonly") + os.makedirs(readonly_dir, mode=0o555) + try: + with pytest.raises(PermissionError) as exc_info: + log_utils.get_log_path(readonly_dir, test_file) + assert "Unable to access directory" in str(exc_info.value) + finally: + os.chmod(readonly_dir, 0o755) # Cleanup permissions + os.rmdir(readonly_dir) def test_get_format(self): show_location = True name = "test1" timezone = "UTC" result = log_utils.get_format(show_location, name, timezone) - assert result == ( - f"[%(asctime)s.%(msecs)03d+0000]:[%(levelname)s]:[{name}]:" - "[%(filename)s:%(funcName)s:%(lineno)d]:%(message)s" - ) + # On systems without UTC timezone data, this falls back to localtime + # Just verify the format structure is correct + assert f"[{name}]:" in result + assert "[%(filename)s:%(funcName)s:%(lineno)d]:" in result + assert "%(message)s" in result show_location = False name = "test2" timezone = "America/Los_Angeles" result = log_utils.get_format(show_location, name, timezone) - assert result.startswith("[%(asctime)s.%(msecs)03d-0") - assert result.endswith(f"]:[%(levelname)s]:[{name}]:%(message)s") + # On systems without this timezone, it falls back to localtime + # Just verify the basic structure + assert f"[{name}]:" in result + assert "%(message)s" in result show_location = False name = "test3" timezone = "Australia/Queensland" result = log_utils.get_format(show_location, name, timezone) - assert result == f"[%(asctime)s.%(msecs)03d+1000]:[%(levelname)s]:[{name}]:%(message)s" + # On systems without timezone data (common on Windows), this falls back to localtime + # Test should verify format structure rather than hardcoded timezone offset + expected_base_format = f"[%(asctime)s.%(msecs)03d" + assert result.startswith(expected_base_format) + assert f"]:[%(levelname)s]:[{name}]:%(message)s" in result + # Verify timezone offset is present (either +1000 or fallback) + import re + # The % characters need to be literal in the regex + offset_pattern = r'\[%\(asctime\)s\.%\(msecs\)03d([+-]\d{4})\]' + match = re.search(offset_pattern, result) + assert match is not None, f"No timezone offset found in format: {result}" + # The offset could be +1000 (if timezone is available) or system localtime fallback + offset = match.group(1) + assert re.match(r'[+-]\d{4}', offset), f"Invalid timezone offset format: {offset}" def test_gzip_file_with_sufix(self): - directory = tempfile.gettempdir() - tmpfilewrapper = tempfile.NamedTemporaryFile(dir=directory, suffix=".log") - file_path = tmpfilewrapper.name - assert os.path.isfile(file_path) == True - sufix = "test1" - result = log_utils.gzip_file_with_sufix(file_path, sufix) - file_path_no_suffix = file_path.split(".")[0] - assert result == f"{file_path_no_suffix}_{sufix}.log.gz" - log_utils.delete_file(result) - assert os.path.isfile(result) == False - - # test a non-existent file - file_path = "/non-existent-directory/test2.log" - sufix = "test2" - result = log_utils.gzip_file_with_sufix(file_path, sufix) - assert result is None + """Test gzip_file_with_sufix with standard Unix/Linux file handling.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as tmp_file: + file_path = tmp_file.name + tmp_file.write("test content for gzip") + + try: + assert os.path.isfile(file_path) == True + sufix = "test1" + result = log_utils.gzip_file_with_sufix(file_path, sufix) + file_path_no_suffix = file_path.split(".")[0] + assert result == f"{file_path_no_suffix}_{sufix}.log.gz" + + # Clean up the gzipped file + if os.path.exists(result): + os.unlink(result) + assert os.path.isfile(result) == False + + finally: + # Ensure cleanup of the original file if it still exists + if os.path.exists(file_path): + os.unlink(file_path) + + # test a non-existent file - use tempfile path that doesn't exist + with tempfile.TemporaryDirectory() as temp_dir: + file_path = os.path.join(temp_dir, "non-existent-directory", "test2.log") + sufix = "test2" + result = log_utils.gzip_file_with_sufix(file_path, sufix) + assert result is None def test_get_timezone_function(self): timezone = "UTC" result = log_utils.get_timezone_function(timezone) - assert result.__name__ == "gmtime" + # On systems without UTC timezone data, this may fall back to localtime + assert result.__name__ in ["gmtime", "localtime"] timezone = "localtime" result = log_utils.get_timezone_function(timezone) @@ -198,7 +565,8 @@ def test_get_timezone_function(self): timezone = "America/Los_Angeles" result = log_utils.get_timezone_function(timezone) - assert result.__name__ == "" + # On systems without timezone data (common on Windows), this falls back to localtime + assert result.__name__ in ["", "localtime"] def test_write_stderr(self): """Test write_stderr function output""" @@ -266,7 +634,7 @@ def test_get_logger_and_formatter_cleanup(self): assert len(new_logger.handlers) == 0 def test_timezone_offset_caching(self): - """Test _get_timezone_offset function via get_format""" + """Test get_timezone_offset function via get_format""" # Test UTC timezone format1 = log_utils.get_format(False, "test", "UTC") format2 = log_utils.get_format(False, "test", "UTC") @@ -278,7 +646,7 @@ def test_timezone_offset_caching(self): assert format3 is not None def test_stderr_timezone_caching(self): - """Test _get_stderr_timezone function via write_stderr""" + """Test get_stderr_timezone function via write_stderr""" # Test with UTC original_tz = os.environ.get("LOG_TIMEZONE") os.environ["LOG_TIMEZONE"] = "UTC" @@ -289,7 +657,9 @@ def test_stderr_timezone_caching(self): log_utils.write_stderr("Test UTC message") output = stderr_capture.getvalue() - assert "+0000" in output or "Z" in output # UTC timezone indicator + # On systems with UTC timezone data, should have +0000 or Z + # On Windows without timezone data, falls back to local time (no timezone indicator) + assert "+0000" in output or "Z" in output or ("]:[ERROR]:" in output and "Test UTC message" in output) finally: if original_tz is not None: os.environ["LOG_TIMEZONE"] = original_tz @@ -297,7 +667,7 @@ def test_stderr_timezone_caching(self): del os.environ["LOG_TIMEZONE"] def test_stderr_timezone_localtime(self): - """Test _get_stderr_timezone with localtime""" + """Test get_stderr_timezone with localtime""" original_tz = os.environ.get("LOG_TIMEZONE") os.environ["LOG_TIMEZONE"] = "localtime" @@ -362,6 +732,7 @@ def test_gzip_file_error_handling(self): result = log_utils.gzip_file_with_sufix("/non/existent/file.log", "test") assert result is None + def test_remove_old_logs_edge_cases(self): """Test remove_old_logs with edge cases""" with tempfile.TemporaryDirectory() as temp_dir: @@ -435,6 +806,7 @@ def close(self): assert new_logger is logger assert len(new_logger.handlers) == 0 + @pytest.mark.skipif(sys.platform == "win32", reason="Unix/Linux/macOS-specific chmod test") def test_remove_old_logs_file_error(self): """Test remove_old_logs error handling when file deletion fails""" with tempfile.TemporaryDirectory() as temp_dir: @@ -512,8 +884,9 @@ def test_delete_file_special_file(self): assert result == True assert not os.path.exists(link_file) + @pytest.mark.skipif(sys.platform == "win32", reason="Unix/Linux/macOS-specific chmod test") def test_get_log_path_permission_error(self): - """Test get_log_path when directory exists but is not writable""" + """Test get_log_path when directory exists but is not writable (Unix/Linux/macOS)""" with tempfile.TemporaryDirectory() as temp_dir: # Create a subdirectory and make it read-only readonly_dir = os.path.join(temp_dir, "readonly") @@ -529,8 +902,9 @@ def test_get_log_path_permission_error(self): finally: os.chmod(readonly_dir, 0o755) # Restore for cleanup + @pytest.mark.skipif(sys.platform == "win32", reason="Unix/Linux/macOS-specific chmod test") def test_gzip_file_io_error(self): - """Test gzip_file_with_sufix error handling during compression""" + """Test gzip_file_with_sufix error handling during compression (Unix/Linux/macOS)""" with tempfile.TemporaryDirectory() as temp_dir: # Create a test file test_file = os.path.join(temp_dir, "test.log") @@ -551,6 +925,7 @@ def test_gzip_file_io_error(self): finally: os.chmod(temp_dir, 0o755) # Restore for cleanup + @pytest.mark.skipif(sys.platform == "win32", reason="Unix/Linux/macOS-specific chmod test") def test_gzip_file_deletion_error(self): """Test gzip_file_with_sufix error when source file deletion fails""" with tempfile.TemporaryDirectory() as temp_dir: @@ -564,21 +939,21 @@ def test_gzip_file_deletion_error(self): assert result is not None assert result.endswith("_test.log.gz") - # Clean up + # Clean up with Windows-compatible deletion if os.path.exists(result): - os.unlink(result) + safe_delete_file(result) def test_write_stderr_fallback(self): """Test write_stderr fallback when timezone operations fail""" # Save original function - original_get_stderr_tz = log_utils._get_stderr_timezone + original_get_stderr_tz = log_utils.get_stderr_timezone - # Mock _get_stderr_timezone to raise an error + # Mock get_stderr_timezone to raise an error def mock_error_timezone(): raise KeyError("Mock timezone error") try: - log_utils._get_stderr_timezone = mock_error_timezone + log_utils.get_stderr_timezone = mock_error_timezone stderr_capture = io.StringIO() with contextlib.redirect_stderr(stderr_capture): @@ -589,17 +964,17 @@ def mock_error_timezone(): assert "ERROR" in output finally: # Restore original function - log_utils._get_stderr_timezone = original_get_stderr_tz + log_utils.get_stderr_timezone = original_get_stderr_tz def test_stderr_timezone_with_special_timezone(self): - """Test _get_stderr_timezone with different timezone configurations""" + """Test get_stderr_timezone with different timezone configurations""" original_tz = os.environ.get("LOG_TIMEZONE") try: # Test with a specific timezone os.environ["LOG_TIMEZONE"] = "Europe/London" # Clear the cache - log_utils._get_stderr_timezone.cache_clear() + log_utils.get_stderr_timezone.cache_clear() stderr_capture = io.StringIO() with contextlib.redirect_stderr(stderr_capture): @@ -613,7 +988,7 @@ def test_stderr_timezone_with_special_timezone(self): os.environ["LOG_TIMEZONE"] = original_tz elif "LOG_TIMEZONE" in os.environ: del os.environ["LOG_TIMEZONE"] - log_utils._get_stderr_timezone.cache_clear() + log_utils.get_stderr_timezone.cache_clear() def test_check_filename_instance_edge_cases(self): """Test check_filename_instance with more edge cases.""" @@ -637,7 +1012,7 @@ def test_lru_cache_behavior_verification(self): """Test LRU cache behavior in timezone functions.""" # Clear caches first log_utils.get_timezone_function.cache_clear() - log_utils._get_timezone_offset.cache_clear() + log_utils.get_timezone_offset.cache_clear() # Test get_timezone_function cache initial_cache = log_utils.get_timezone_function.cache_info() @@ -653,18 +1028,17 @@ def test_lru_cache_behavior_verification(self): assert cache_info.currsize == 2 # Two unique calls assert cache_info.hits >= 1 # At least one cache hit - # Test _get_timezone_offset cache - offset1 = log_utils._get_timezone_offset("UTC") - offset2 = log_utils._get_timezone_offset("UTC") + # Test get_timezone_offset cache + offset1 = log_utils.get_timezone_offset("UTC") + offset2 = log_utils.get_timezone_offset("UTC") assert offset1 == offset2 - offset_cache = log_utils._get_timezone_offset.cache_info() + offset_cache = log_utils.get_timezone_offset.cache_info() assert offset_cache.currsize >= 1 assert offset_cache.hits >= 1 def test_thread_safety_directory_check(self): """Test thread safety of directory permission checking.""" - import threading import concurrent.futures errors = [] @@ -726,19 +1100,23 @@ def test_get_timezone_function_edge_cases(self): """Test get_timezone_function with various timezone inputs.""" # Test standard timezones utc_func = log_utils.get_timezone_function("UTC") - assert utc_func.__name__ == "gmtime" + # On systems without UTC timezone data (common on Windows), this falls back to localtime + assert utc_func.__name__ in ["gmtime", "localtime"] local_func = log_utils.get_timezone_function("localtime") assert local_func.__name__ == "localtime" - # Test case insensitivity + # Test case insensitivity - both should return the same function (cached) utc_upper = log_utils.get_timezone_function("UTC") utc_lower = log_utils.get_timezone_function("utc") assert utc_upper is utc_lower # Should be cached + # Both should be either gmtime or localtime (fallback) + assert utc_upper.__name__ in ["gmtime", "localtime"] # Test custom timezone custom_func = log_utils.get_timezone_function("America/New_York") - assert custom_func.__name__ == "" + # On systems without timezone data (common on Windows), this falls back to localtime + assert custom_func.__name__ in ["", "localtime"] # Test function returns proper time tuple time_tuple = custom_func() @@ -812,7 +1190,7 @@ def test_path_operations_edge_cases(self): def test_timezone_offset_various_timezones(self): """Test timezone offset calculation for various timezones.""" # Clear cache first - log_utils._get_timezone_offset.cache_clear() + log_utils.get_timezone_offset.cache_clear() # Test various timezones timezones = [ @@ -825,7 +1203,7 @@ def test_timezone_offset_various_timezones(self): for tz, expected_offset in timezones: try: - offset = log_utils._get_timezone_offset(tz) + offset = log_utils.get_timezone_offset(tz) assert isinstance(offset, str) assert len(offset) == 5 # Format: +/-HHMM assert offset[0] in ['+', '-'] @@ -874,8 +1252,8 @@ def test_memory_efficiency_verification(self): # Clear all caches log_utils.get_timezone_function.cache_clear() - log_utils._get_timezone_offset.cache_clear() - log_utils._get_stderr_timezone.cache_clear() + log_utils.get_timezone_offset.cache_clear() + log_utils.get_stderr_timezone.cache_clear() log_utils._checked_directories.clear() # Test that repeated operations don't significantly increase memory @@ -884,10 +1262,380 @@ def test_memory_efficiency_verification(self): # Perform many operations for i in range(100): log_utils.get_timezone_function("UTC") - log_utils._get_timezone_offset("UTC") + log_utils.get_timezone_offset("UTC") log_utils.get_format(False, f"test_{i}", "UTC") # Reference count shouldn't grow significantly final_refs = sys.getrefcount(log_utils.get_timezone_function) ref_growth = final_refs - initial_refs assert ref_growth < 50, f"Memory leak detected: reference count grew by {ref_growth}" + + def test_directory_permissions_double_checked_locking(self): + """Test the double-checked locking pattern in check_directory_permissions.""" + import threading + + with tempfile.TemporaryDirectory() as temp_dir: + # Clear cache first + log_utils._checked_directories.clear() + + # Create a barrier to synchronize threads + barrier = threading.Barrier(2) + results = [] + + def worker(): + barrier.wait() # Ensure both threads start at the same time + log_utils.check_directory_permissions(temp_dir) + results.append(temp_dir in log_utils._checked_directories) + + # Start two threads that will both try to check the same directory + threads = [threading.Thread(target=worker) for _ in range(2)] + for t in threads: + t.start() + + for t in threads: + t.join() + + # Both should have seen the directory in the cache + assert all(results) + assert temp_dir in log_utils._checked_directories + + @pytest.mark.skipif(sys.platform == "win32", reason="Unix/Linux/macOS-specific FIFO test") + def test_delete_file_special_file_coverage(self): + """Test delete_file with special file that exists but is neither file nor dir.""" + # This tests the elif path_obj.exists() branch (line 125) + # Create a FIFO (named pipe) which is a special file type + with tempfile.TemporaryDirectory() as temp_dir: + fifo_path = os.path.join(temp_dir, "test_fifo") + try: + os.mkfifo(fifo_path) + assert os.path.exists(fifo_path) + assert not os.path.isfile(fifo_path) + assert not os.path.isdir(fifo_path) + + # delete_file should handle this special file + result = log_utils.delete_file(fifo_path) + assert result == True + assert not os.path.exists(fifo_path) + except OSError: + # FIFO creation might not be supported on all systems + pytest.skip("FIFO creation not supported on this system") + + def test_stderr_timezone_fallback_exception(self): + """Test get_stderr_timezone fallback when ZoneInfo raises exception.""" + original_tz = os.environ.get("LOG_TIMEZONE") + + try: + # Set an invalid timezone to trigger the exception path + os.environ["LOG_TIMEZONE"] = "Invalid/NonExistent/Timezone" + log_utils.get_stderr_timezone.cache_clear() + + # This should trigger the exception and fallback to None + result = log_utils.get_stderr_timezone() + assert result is None # Should fall back to None (local timezone) + + finally: + if original_tz is not None: + os.environ["LOG_TIMEZONE"] = original_tz + elif "LOG_TIMEZONE" in os.environ: + del os.environ["LOG_TIMEZONE"] + log_utils.get_stderr_timezone.cache_clear() + + def test_write_stderr_local_timezone_path(self): + """Test write_stderr when using local timezone (tz is None).""" + original_tz = os.environ.get("LOG_TIMEZONE") + + try: + # Set timezone to localtime to trigger the tz is None path + os.environ["LOG_TIMEZONE"] = "localtime" + log_utils.get_stderr_timezone.cache_clear() + + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + log_utils.write_stderr("Test local timezone message") + + output = stderr_capture.getvalue() + assert "Test local timezone message" in output + assert "ERROR" in output + # Should use local timezone (line 173: dt = datetime.now()) + + finally: + if original_tz is not None: + os.environ["LOG_TIMEZONE"] = original_tz + elif "LOG_TIMEZONE" in os.environ: + del os.environ["LOG_TIMEZONE"] + log_utils.get_stderr_timezone.cache_clear() + + @pytest.mark.skipif(sys.platform == "win32", reason="Unix/Linux/macOS-specific chmod test") + def test_get_log_path_write_permission_error(self): + """Test get_log_path when directory exists but write check fails (Unix/Linux/macOS).""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a directory and make it non-writable + test_dir = os.path.join(temp_dir, "non_writable") + os.makedirs(test_dir) + + # Add to cache first to bypass check_directory_permissions + log_utils._checked_directories.add(test_dir) + + # Make directory non-writable + os.chmod(test_dir, 0o555) # Read and execute only + + try: + with pytest.raises(PermissionError) as exc_info: + log_utils.get_log_path(test_dir, "test.log") + + # Should hit lines 201-203 + assert "Unable to write to log directory" in str(exc_info.value) + + finally: + os.chmod(test_dir, 0o755) # Restore for cleanup + log_utils._checked_directories.discard(test_dir) + + def test_timezone_offset_fallback_exception(self): + """Test get_timezone_offset fallback when ZoneInfo raises exception.""" + log_utils.get_timezone_offset.cache_clear() + + # Test with invalid timezone that will trigger exception path + result = log_utils.get_timezone_offset("Invalid/Timezone/That/Does/Not/Exist") + + # Should fall back to localtime (lines 216-219) + assert isinstance(result, str) + assert len(result) == 5 # Format: +/-HHMM + assert result[0] in ['+', '-'] + + def test_gzip_file_source_deletion_error_coverage(self): + """Test gzip_file_with_sufix when source file deletion fails.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create test file + test_file = os.path.join(temp_dir, "test.log") + with open(test_file, "w") as f: + f.write("test content") + + # Mock Path.unlink to raise OSError during deletion + import unittest.mock + from pathlib import Path + + original_unlink = Path.unlink + def mock_unlink(self): + if str(self) == test_file: + raise OSError("Mock deletion error") + return original_unlink(self) + + try: + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + with unittest.mock.patch.object(Path, 'unlink', mock_unlink): + with pytest.raises(OSError): + log_utils.gzip_file_with_sufix(test_file, "test") + + # Should hit lines 257-259 + output = stderr_capture.getvalue() + assert "Unable to delete source log file" in output + + finally: + Path.unlink = original_unlink + + def test_get_timezone_function_utc_fallback(self): + """Test get_timezone_function UTC fallback when ZoneInfo UTC fails.""" + log_utils.get_timezone_function.cache_clear() + + # Mock the entire zoneinfo module to raise exception for UTC + import unittest.mock + + def mock_zoneinfo(key): + if key == "UTC": + raise Exception("Mock UTC timezone error") + # Return the real ZoneInfo for other timezones + from zoneinfo import ZoneInfo + return ZoneInfo(key) + + try: + with unittest.mock.patch('pythonLogs.log_utils.ZoneInfo', side_effect=mock_zoneinfo): + result = log_utils.get_timezone_function("UTC") + + # Should fall back to localtime (lines 273-275) + assert result.__name__ == "localtime" + + finally: + log_utils.get_timezone_function.cache_clear() + + def test_get_timezone_function_custom_timezone_fallback(self): + """Test get_timezone_function custom timezone fallback.""" + log_utils.get_timezone_function.cache_clear() + + # Mock the entire zoneinfo module to raise exception for custom timezone + import unittest.mock + + def mock_zoneinfo(key): + if key == "Custom/Timezone": + raise Exception("Mock custom timezone error") + # Return the real ZoneInfo for other timezones + from zoneinfo import ZoneInfo + return ZoneInfo(key) + + try: + with unittest.mock.patch('pythonLogs.log_utils.ZoneInfo', side_effect=mock_zoneinfo): + result = log_utils.get_timezone_function("Custom/Timezone") + + # Should fall back to localtime (lines 283-285) + assert result.__name__ == "localtime" + + finally: + log_utils.get_timezone_function.cache_clear() + + def test_gzip_file_osioerror_handling(self): + """Test gzip_file_with_sufix OSError/IOError handling during compression.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a test file + test_file = os.path.join(temp_dir, "test_osioerror.log") + with open(test_file, "w") as f: + f.write("test content for OSError test") + + # Mock gzip.open to raise OSError during compression + import unittest.mock + + original_gzip_open = __import__('gzip').open + def mock_gzip_open(*args, **kwargs): + # Raise OSError to trigger lines 265-267 + raise OSError("Mock OSError during gzip compression") + + try: + with unittest.mock.patch('gzip.open', side_effect=mock_gzip_open): + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + with pytest.raises(OSError) as exc_info: + log_utils.gzip_file_with_sufix(test_file, "osioerror_test") + + # Verify the error was logged to stderr (line 266) + output = stderr_capture.getvalue() + assert "Unable to gzip log file" in output + assert test_file in output + assert "Mock OSError during gzip compression" in output + + # Verify the exception was re-raised (line 267) + assert "Mock OSError during gzip compression" in str(exc_info.value) + + finally: + # Cleanup: remove the test file if it still exists + if os.path.exists(test_file): + os.unlink(test_file) + + def test_gzip_file_ioerror_handling(self): + """Test gzip_file_with_sufix IOError handling during compression.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a test file + test_file = os.path.join(temp_dir, "test_ioerror.log") + with open(test_file, "w") as f: + f.write("test content for IOError test") + + # Mock shutil.copyfileobj to raise IOError during copy + import unittest.mock + + def mock_copyfileobj(*args, **kwargs): + # Raise IOError to trigger lines 265-267 + raise IOError("Mock IOError during file copy") + + try: + with unittest.mock.patch('shutil.copyfileobj', side_effect=mock_copyfileobj): + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + with pytest.raises(IOError) as exc_info: + log_utils.gzip_file_with_sufix(test_file, "ioerror_test") + + # Verify the error was logged to stderr (line 266) + output = stderr_capture.getvalue() + assert "Unable to gzip log file" in output + assert test_file in output + assert "Mock IOError during file copy" in output + + # Verify the exception was re-raised (line 267) + assert "Mock IOError during file copy" in str(exc_info.value) + + finally: + # Cleanup: remove the test file if it still exists + if os.path.exists(test_file): + os.unlink(test_file) + + def test_gzip_file_windows_retry_mechanism_coverage(self): + """Test gzip_file_with_sufix Windows retry mechanism for coverage.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a test file + test_file = os.path.join(temp_dir, "test_retry.log") + with open(test_file, "w") as f: + f.write("test content for retry mechanism") + + import unittest.mock + + # Mock to simulate Windows platform and PermissionError on first attempt + call_count = 0 + original_open = open + + def mock_open_side_effect(*args, **kwargs): + nonlocal call_count + call_count += 1 + if args[0] == test_file and call_count == 1: + # First call - simulate Windows file locking + raise PermissionError("The process cannot access the file") + else: + # Subsequent calls - use real open + return original_open(*args, **kwargs) + + # Mock sys.platform to be Windows and time.sleep to verify retry + with unittest.mock.patch('pythonLogs.log_utils.sys.platform', 'win32'): + with unittest.mock.patch('pythonLogs.log_utils.time.sleep') as mock_sleep: + with unittest.mock.patch('pythonLogs.log_utils.open', side_effect=mock_open_side_effect): + # This should succeed after retry, covering lines 259-261 + result = log_utils.gzip_file_with_sufix(test_file, "retry_coverage") + + # Verify retry was attempted (sleep was called) - line 260 + mock_sleep.assert_called_once_with(0.1) + + # Verify the operation eventually succeeded + assert result is not None + assert result.endswith("_retry_coverage.log.gz") + assert not os.path.exists(test_file) # Original should be deleted + + # Clean up the gzipped file + if result and os.path.exists(result): + os.unlink(result) + + def test_gzip_file_windows_retry_exhausted_coverage(self): + """Test gzip_file_with_sufix when Windows retries are exhausted for coverage.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a test file + test_file = os.path.join(temp_dir, "test_retry_fail.log") + with open(test_file, "w") as f: + f.write("test content for retry exhaustion") + + import unittest.mock + + # Mock to always raise PermissionError to exhaust retries + def mock_open_side_effect(*args, **kwargs): + if args[0] == test_file: + # Always fail - simulate persistent Windows file locking + raise PermissionError("The process cannot access the file - persistent lock") + else: + # Other opens work normally + return open(*args, **kwargs) + + # Mock sys.platform to be Windows and capture stderr + with unittest.mock.patch('pythonLogs.log_utils.sys.platform', 'win32'): + with unittest.mock.patch('pythonLogs.log_utils.time.sleep') as mock_sleep: + with unittest.mock.patch('pythonLogs.log_utils.open', side_effect=mock_open_side_effect): + # Capture stderr to verify error logging + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + with pytest.raises(PermissionError) as exc_info: + # This should exhaust retries and fail, covering lines 262-264 + log_utils.gzip_file_with_sufix(test_file, "retry_fail") + + # Verify retries were attempted (should be called twice for 3 attempts) + assert mock_sleep.call_count == 2 + + # Verify error was logged to stderr (line 263) + output = stderr_capture.getvalue() + assert "Unable to gzip log file" in output + assert test_file in output + assert "persistent lock" in output + + # Verify the exception was re-raised (line 264) + assert "persistent lock" in str(exc_info.value) diff --git a/tests/core/test_log_utils_windows.py b/tests/core/test_log_utils_windows.py new file mode 100644 index 0000000..8304451 --- /dev/null +++ b/tests/core/test_log_utils_windows.py @@ -0,0 +1,491 @@ +# -*- encoding: utf-8 -*- +""" +Windows-specific tests for log_utils module. + +These tests are designed to run specifically on Windows OS and test +Windows-specific behaviors like file locking, permission models, and +timezone handling differences. +""" +import contextlib +import io +import logging +import os +import sys +import tempfile +import time +import pytest + + +# Add parent directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +# Add current directory to path for local imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from pythonLogs import log_utils + +# Import utility functions from the same directory +from test_log_utils import ( + create_windows_safe_temp_file, + safe_close_and_delete_file, + windows_safe_temp_directory +) + + +class TestLogUtilsWindows: + """Windows-specific tests for log_utils functionality.""" + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_check_directory_permissions_windows(self): + """Test Windows-specific directory permission behavior.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a deeply nested path that should trigger directory creation + nested_path = os.path.join(temp_dir, "level1", "level2", "level3", "level4") + + # This should succeed and create the directories (function returns None) + log_utils.check_directory_permissions(nested_path) + assert os.path.exists(nested_path) + + # Test with a path that contains invalid characters (Windows-specific) + try: + invalid_chars_path = os.path.join(temp_dir, "invalid<>:|*?\"path") + # This might raise different exceptions on different Windows versions + with pytest.raises((PermissionError, OSError, ValueError)) as exec_info: + log_utils.check_directory_permissions(invalid_chars_path) + # The specific error message may vary + assert any(phrase in str(exec_info.value).lower() for phrase in + ["unable", "invalid", "permission", "access"]) + except pytest.skip.Exception: + pytest.skip("Windows permission test with invalid characters not applicable") + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_get_log_path_windows_permissions(self): + """Test Windows-specific permission handling in get_log_path.""" + with tempfile.TemporaryDirectory() as temp_dir: + test_file = "test.log" + # Test 1: Valid directory should return the correct path + result = log_utils.get_log_path(temp_dir, test_file) + assert result == os.path.join(temp_dir, test_file) + + # Test 2: Directory that gets created should work fine + new_dir = os.path.join(temp_dir, "newdir") + result = log_utils.get_log_path(new_dir, test_file) + assert result == os.path.join(new_dir, test_file) + assert os.path.exists(new_dir) # Should have been created + + # Test 3: On Windows, we skip the permission error test since + # chmod doesn't work the same way as Unix systems + pytest.skip("Directory permission test not applicable on Windows") + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_delete_file_windows_safe(self): + """Test delete_file with Windows-safe file handling.""" + # Import test utilities from the same directory + + # Create a Windows-safe temporary file + file_handle, file_path = create_windows_safe_temp_file(suffix=".log", text=True) + + try: + # Write some content and close the file properly + file_handle.write("test content") + file_handle.close() + + assert os.path.isfile(file_path) == True + log_utils.delete_file(file_path) + assert os.path.isfile(file_path) == False + finally: + # Ensure cleanup if the test fails + if os.path.exists(file_path): + safe_close_and_delete_file(None, file_path) + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_is_older_than_x_days_windows_safe(self): + """Test is_older_than_x_days with Windows-safe file handling.""" + # Import test utilities from the same directory + + # Create a Windows-safe temporary file + file_handle, file_path = create_windows_safe_temp_file(suffix=".log", text=True) + + try: + # Write some content and close the file properly + file_handle.write("test content") + file_handle.close() + + assert os.path.isfile(file_path) == True + + # When days=1, it compares against 1 day ago, so newly created file should NOT be older + result = log_utils.is_older_than_x_days(file_path, 1) + assert result == False + + # When days=5, it compares against 5 days ago, so newly created file should NOT be older + result = log_utils.is_older_than_x_days(file_path, 5) + assert result == False + + log_utils.delete_file(file_path) + assert os.path.isfile(file_path) == False + finally: + # Ensure cleanup if the test fails + if os.path.exists(file_path): + safe_close_and_delete_file(None, file_path) + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_gzip_file_with_sufix_windows_safe(self): + """Test gzip_file_with_sufix with Windows-safe file handling.""" + # Create a Windows-safe temporary file + file_handle, file_path = create_windows_safe_temp_file(suffix=".log", text=True) + + try: + # Write some test content and close the file properly + file_handle.write("test content for gzip") + file_handle.close() + + assert os.path.isfile(file_path) == True + sufix = "test1" + result = log_utils.gzip_file_with_sufix(file_path, sufix) + file_path_no_suffix = file_path.split(".")[0] + assert result == f"{file_path_no_suffix}_{sufix}.log.gz" + + # Clean up the gzipped file with Windows-safe deletion + safe_close_and_delete_file(None, result) + assert os.path.isfile(result) == False + + finally: + # Ensure cleanup of the original file if it still exists + if os.path.exists(file_path): + safe_close_and_delete_file(None, file_path) + + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_gzip_file_windows_retry_mechanism(self): + """Test that gzip_file_with_sufix handles Windows file locking with retry.""" + from unittest.mock import patch + + # Create a Windows-safe temporary file + file_handle, file_path = create_windows_safe_temp_file(suffix=".log", text=True) + + try: + # Write content and close properly + file_handle.write("test content for retry test") + file_handle.close() + + # Test both with and without platform mocking to ensure retry works + for mock_platform in [False, True]: + # Mock time.sleep to verify retry mechanism + with patch('pythonLogs.log_utils.time.sleep') as mock_sleep: + # Mock open to raise PermissionError on first call, succeed on second + call_count = 0 + original_open = open + + def mock_open_side_effect(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + # First call - simulate Windows file locking + raise PermissionError("Permission denied") + else: + # Subsequent calls - use real open + return original_open(*args, **kwargs) + + context_manager = ( + patch('pythonLogs.log_utils.sys.platform', 'win32') if mock_platform + else patch('pythonLogs.log_utils.open', side_effect=mock_open_side_effect) + ) + + with context_manager: + if mock_platform: + with patch('pythonLogs.log_utils.open', side_effect=mock_open_side_effect): + result = log_utils.gzip_file_with_sufix(file_path, f"retry_test_{mock_platform}") + else: + result = log_utils.gzip_file_with_sufix(file_path, f"retry_test_{mock_platform}") + + # Verify retry was attempted (sleep was called) + mock_sleep.assert_called_once_with(0.1) + + # Verify the operation eventually succeeded + assert result is not None + assert f"retry_test_{mock_platform}" in result + + # Clean up the gzipped file + if result and os.path.exists(result): + safe_close_and_delete_file(None, result) + + # Reset for next iteration + call_count = 0 + + finally: + # Clean up the original file + if os.path.exists(file_path): + safe_close_and_delete_file(None, file_path) + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_timezone_fallback_windows(self): + """Test timezone fallback behavior on Windows systems.""" + # Test get_format with timezone that may not be available on Windows + show_location = False + name = "test_windows_tz" + timezone = "Australia/Queensland" + result = log_utils.get_format(show_location, name, timezone) + + # On systems without timezone data (common on Windows), this falls back to localtime + # Test should verify format structure rather than hardcoded timezone offset + expected_base_format = f"[%(asctime)s.%(msecs)03d" + assert result.startswith(expected_base_format) + assert f"]:[%(levelname)s]:[{name}]:%(message)s" in result + + # Verify timezone offset is present (either specific timezone or fallback) + import re + # The % characters need to be literal in the regex + offset_pattern = r'\[%\(asctime\)s\.%\(msecs\)03d([+-]\d{4})\]' + match = re.search(offset_pattern, result) + assert match is not None, f"No timezone offset found in format: {result}" + # The offset could be the specific timezone or system localtime fallback + offset = match.group(1) + assert re.match(r'[+-]\d{4}', offset), f"Invalid timezone offset format: {offset}" + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_windows_timezone_environment_fallback(self): + """Test Windows-specific timezone environment variable fallback.""" + original_tz = os.environ.get("LOG_TIMEZONE") + + try: + # Set an invalid timezone that doesn't exist on Windows + os.environ["LOG_TIMEZONE"] = "Invalid/Windows/Timezone" + log_utils.get_stderr_timezone.cache_clear() + + # This should trigger the exception and fallback to None (local timezone) + result = log_utils.get_stderr_timezone() + assert result is None # Should fall back to None (local timezone) + + # Test that write_stderr still works with fallback + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + log_utils.write_stderr("Windows timezone fallback test") + + output = stderr_capture.getvalue() + assert "Windows timezone fallback test" in output + assert "ERROR" in output + finally: + if original_tz is not None: + os.environ["LOG_TIMEZONE"] = original_tz + elif "LOG_TIMEZONE" in os.environ: + del os.environ["LOG_TIMEZONE"] + log_utils.get_stderr_timezone.cache_clear() + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_windows_file_locking_resilience(self): + """Test that Windows file locking resilience works in production scenarios.""" + # Use Windows-safe temporary directory + with windows_safe_temp_directory() as temp_dir: + test_file = os.path.join(temp_dir, "windows_resilience_test.log") + + # Create file with content + with open(test_file, "w") as f: + f.write("Windows file locking resilience test content\n" * 100) + + # Ensure file is properly closed before gzip operation + assert os.path.isfile(test_file) + + # This should work without issues on Windows + result = log_utils.gzip_file_with_sufix(test_file, "windows_test") + + assert result is not None + assert result.endswith("_windows_test.log.gz") + assert os.path.exists(result) + assert not os.path.exists(test_file) # Original should be deleted + + # Verify compressed content + import gzip + with gzip.open(result, "rt") as f: + content = f.read() + assert "Windows file locking resilience test content" in content + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_windows_path_handling(self): + """Test Windows-specific path handling behaviors.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Test with Windows-style path separators + windows_style_dir = temp_dir.replace("/", "\\") + test_file = "windows_path_test.log" + + result = log_utils.get_log_path(windows_style_dir, test_file) + expected = os.path.join(windows_style_dir, test_file) + assert result == expected + + # Test with mixed separators (Windows handles this) + mixed_dir = os.path.join(temp_dir, "mixed\\path/test") + result = log_utils.get_log_path(mixed_dir, test_file) + assert os.path.exists(mixed_dir) # Should be created successfully + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_windows_stderr_timezone_with_dst(self): + """Test Windows timezone handling with DST considerations.""" + original_tz = os.environ.get("LOG_TIMEZONE") + + try: + # Test with a timezone that has DST changes + os.environ["LOG_TIMEZONE"] = "America/New_York" + log_utils.get_stderr_timezone.cache_clear() + + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + log_utils.write_stderr("Windows DST timezone test") + + output = stderr_capture.getvalue() + assert "Windows DST timezone test" in output + assert "ERROR" in output + + # Should contain some form of timezone offset + # Windows may fall back to local timezone if specific timezone unavailable + assert any(char in output for char in ['+', '-']) or 'Z' in output + + finally: + if original_tz is not None: + os.environ["LOG_TIMEZONE"] = original_tz + elif "LOG_TIMEZONE" in os.environ: + del os.environ["LOG_TIMEZONE"] + log_utils.get_stderr_timezone.cache_clear() + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_windows_concurrent_file_operations(self): + """Test concurrent file operations on Windows with proper cleanup.""" + import concurrent.futures + import threading + + results = [] + errors = [] + lock = threading.Lock() + + def windows_file_worker(worker_id): + """Worker that performs Windows-safe file operations.""" + try: + # Create Windows-safe temporary file + file_handle, file_path = create_windows_safe_temp_file( + suffix=f"_worker_{worker_id}.log", text=True + ) + + try: + # Write content and close properly + file_handle.write(f"Content from worker {worker_id}\n" * 10) + file_handle.close() + + # Test file age check + is_old = log_utils.is_older_than_x_days(file_path, 1) + + # Test gzip operation + gzip_result = log_utils.gzip_file_with_sufix(file_path, f"worker_{worker_id}") + + with lock: + results.append({ + 'worker_id': worker_id, + 'file_path': file_path, + 'is_old': is_old, + 'gzip_result': gzip_result + }) + + # Clean up gzipped file + if gzip_result and os.path.exists(gzip_result): + safe_close_and_delete_file(None, gzip_result) + + finally: + # Ensure cleanup of original file if it still exists + if os.path.exists(file_path): + safe_close_and_delete_file(None, file_path) + + except Exception as e: + with lock: + errors.append(f"Worker {worker_id}: {str(e)}") + + # Run concurrent workers + num_workers = 5 + with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(windows_file_worker, i) for i in range(num_workers)] + for future in concurrent.futures.as_completed(futures): + future.result() + + # Verify results + assert len(errors) == 0, f"Windows concurrent operations failed: {errors}" + assert len(results) == num_workers + + # Verify all workers completed successfully + for result in results: + assert result['is_old'] == False # Files should NOT be considered "old" (created recently) + assert result['gzip_result'] is not None + assert f"worker_{result['worker_id']}" in result['gzip_result'] + + + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_gzip_file_windows_permission_error(self): + """Test gzip_file_with_sufix Windows-specific permission handling.""" + # This test replaces the Unix chmod-based test for Windows + # Windows file locking is handled differently than Unix permissions + + + # Create a Windows-safe temporary file + file_handle, file_path = create_windows_safe_temp_file(suffix=".log", text=True) + + try: + # Write content and close properly + file_handle.write("test content for Windows permission test") + file_handle.close() + + # On Windows, we test the file locking retry mechanism instead of chmod + # This is more realistic for Windows environments + result = log_utils.gzip_file_with_sufix(file_path, "windows_perm_test") + + # Should succeed on Windows with proper file handling + assert result is not None + assert result.endswith("_windows_perm_test.log.gz") + assert not os.path.exists(file_path) # Original should be deleted + + # Clean up the gzipped file + if result and os.path.exists(result): + safe_close_and_delete_file(None, result) + + finally: + # Ensure cleanup of original file if it still exists + if os.path.exists(file_path): + safe_close_and_delete_file(None, file_path) + + @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific tests") + def test_get_log_path_windows_comprehensive(self): + """Comprehensive test for get_log_path Windows-specific behaviors.""" + with tempfile.TemporaryDirectory() as temp_dir: + test_file = "test.log" + + # Test 1: Valid directory should return the correct path + result = log_utils.get_log_path(temp_dir, test_file) + assert result == os.path.join(temp_dir, test_file) + + # Test 2: Directory that gets created should work fine + new_dir = os.path.join(temp_dir, "newdir") + result = log_utils.get_log_path(new_dir, test_file) + assert result == os.path.join(new_dir, test_file) + assert os.path.exists(new_dir) # Should have been created + + # Test 3: Test with invalid path characters (Windows-specific) + try: + invalid_path = os.path.join(temp_dir, "invalid<>path") + result = log_utils.get_log_path(invalid_path, "test.log") + assert "test.log" in result + except (PermissionError, OSError, ValueError): + # Expected on Windows with invalid characters + pass + + # Test 4: Test normal operation in created directory + test_dir = os.path.join(temp_dir, "test_write_perm") + os.makedirs(test_dir) + result = log_utils.get_log_path(test_dir, "test.log") + assert result == os.path.join(test_dir, "test.log") + + # Test 5: Windows-specific long path names (Windows limitation) + long_filename = "a" * 200 + ".log" # Very long filename + try: + result = log_utils.get_log_path(test_dir, long_filename) + assert long_filename in result + except (OSError, PermissionError): + # Expected on some Windows systems with path length limitations + pass + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/factory/test_enums.py b/tests/factory/test_enums.py index a3e0215..70bd8c0 100644 --- a/tests/factory/test_enums.py +++ b/tests/factory/test_enums.py @@ -40,7 +40,8 @@ def test_log_level_enum_usage(self): def test_rotate_when_enum_usage(self): """Test RotateWhen enum usage.""" - with tempfile.TemporaryDirectory() as temp_dir: + temp_dir = tempfile.mkdtemp() + try: logger = timed_rotating_logger( name="rotate_test", directory=temp_dir, @@ -48,6 +49,17 @@ def test_rotate_when_enum_usage(self): when=RotateWhen.MIDNIGHT # RotateWhen enum ) assert logger.name == "rotate_test" + + finally: + # Ensure proper cleanup on Windows before directory removal + clear_logger_registry() + # Small delay on Windows to ensure file handles are released + if sys.platform == "win32": + import time + time.sleep(0.1) + # Manual cleanup + import shutil + shutil.rmtree(temp_dir, ignore_errors=True) def test_mixed_enum_and_string_usage(self): """Test mixed enum and string usage.""" @@ -98,7 +110,8 @@ def test_all_rotate_when_enum_values(self): def test_enum_string_conversion(self): """Test that enums are properly converted to strings internally.""" - with tempfile.TemporaryDirectory() as temp_dir: + temp_dir = tempfile.mkdtemp() + try: # Create logger with enums logger = LoggerFactory.create_logger( LoggerType.TIMED_ROTATING, @@ -111,10 +124,22 @@ def test_enum_string_conversion(self): # Verify logger was created successfully assert logger.name == "conversion_test" assert logger.level == 40 # ERROR level + + finally: + # Ensure proper cleanup on Windows before directory removal + clear_logger_registry() + # Small delay on Windows to ensure file handles are released + if sys.platform == "win32": + import time + time.sleep(0.1) + # Manual cleanup + import shutil + shutil.rmtree(temp_dir, ignore_errors=True) def test_backward_compatibility_with_strings(self): """Test that string values still work alongside enums.""" - with tempfile.TemporaryDirectory() as temp_dir: + temp_dir = tempfile.mkdtemp() + try: # Mix of enums and strings logger = timed_rotating_logger( name="compat_test", @@ -125,6 +150,17 @@ def test_backward_compatibility_with_strings(self): assert logger.name == "compat_test" assert logger.level == 20 # INFO level + + finally: + # Ensure proper cleanup on Windows before directory removal + clear_logger_registry() + # Small delay on Windows to ensure file handles are released + if sys.platform == "win32": + import time + time.sleep(0.1) + # Manual cleanup + import shutil + shutil.rmtree(temp_dir, ignore_errors=True) def test_logger_type_enum_values(self): """Test LoggerType enum values.""" diff --git a/tests/factory/test_factory.py b/tests/factory/test_factory.py index 6a4e1e5..2eb3896 100644 --- a/tests/factory/test_factory.py +++ b/tests/factory/test_factory.py @@ -9,7 +9,6 @@ # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - from pythonLogs import ( LoggerFactory, LoggerType, @@ -335,8 +334,8 @@ def test_factory_ensure_initialized_behavior(self): def test_factory_atexit_cleanup_error_handling(self): """Test atexit cleanup error handling.""" - from unittest.mock import patch, Mock - + from unittest.mock import patch + # Mock the clear_registry method to raise an error with patch.object(LoggerFactory, 'clear_registry', side_effect=Exception("Test error")): # Should not raise an exception diff --git a/tests/factory/test_factory_examples.py b/tests/factory/test_factory_examples.py index 8774d3e..df607e5 100644 --- a/tests/factory/test_factory_examples.py +++ b/tests/factory/test_factory_examples.py @@ -192,11 +192,20 @@ def test_error_handling_scenarios(self): create_logger("nonexistent_type", name="error_test") # Invalid directory (should raise PermissionError when trying to create) - with pytest.raises(PermissionError): - size_rotating_logger( - name="permission_test", - directory="/invalid/directory/path" - ) + # This test only works on Unix/Linux/macOS systems with chmod + if sys.platform != "win32": + with tempfile.TemporaryDirectory() as temp_dir: + readonly_parent = os.path.join(temp_dir, "readonly") + os.makedirs(readonly_parent, mode=0o555) # Read-only parent + try: + invalid_dir = os.path.join(readonly_parent, "invalid") + with pytest.raises(PermissionError): + size_rotating_logger( + name="permission_test", + directory=invalid_dir + ) + finally: + os.chmod(readonly_parent, 0o755) # Restore permissions for cleanup def test_logger_customization_example(self): """Test logger with extensive customization.""" diff --git a/tests/factory/test_string_levels.py b/tests/factory/test_string_levels.py index 1c957b4..6e12ae4 100644 --- a/tests/factory/test_string_levels.py +++ b/tests/factory/test_string_levels.py @@ -25,17 +25,35 @@ class TestStringLevels: @pytest.fixture(autouse=True) def setup_temp_dir(self): """Set up test fixtures before each test method.""" + + # Import test utilities + from tests.core.test_log_utils import cleanup_all_loggers, safe_delete_directory + # Clear any existing loggers + cleanup_all_loggers() clear_logger_registry() - # Create temporary directory for log files using context manager - with tempfile.TemporaryDirectory() as temp_dir: - self.temp_dir = temp_dir - self.log_file = "string_test.log" - yield + # Create temporary directory for log files + self.temp_dir_obj = tempfile.TemporaryDirectory() + self.temp_dir = self.temp_dir_obj.__enter__() + self.log_file = "string_test.log" - # Clear registry after each test - clear_logger_registry() + yield + + try: + # Clean up all loggers and their handlers before directory deletion + cleanup_all_loggers() + clear_logger_registry() + + # Ensure temporary directory is properly cleaned up + self.temp_dir_obj.__exit__(None, None, None) + except (OSError, PermissionError): + # On Windows, if normal cleanup fails, use safe deletion + try: + safe_delete_directory(self.temp_dir) + except: + # If all else fails, just log the issue + print(f"Warning: Could not clean up temporary directory {self.temp_dir}") def test_basic_logger_string_levels(self): """Test BasicLog with string levels.""" diff --git a/tests/performance/test_memory_optimization.py b/tests/performance/test_memory_optimization.py index 425a5c6..6872b1f 100644 --- a/tests/performance/test_memory_optimization.py +++ b/tests/performance/test_memory_optimization.py @@ -428,9 +428,9 @@ def test_cleanup_logger_handlers_standalone(self): cleanup_logger_handlers(logger) assert len(logger.handlers) == 0 finally: - # Clean up temporary file - if os.path.exists(temp_filename): - os.unlink(temp_filename) + # Clean up temporary file with Windows-compatible deletion + from tests.core.test_log_utils import safe_close_and_delete_file + safe_close_and_delete_file(handler2, temp_filename) def test_cleanup_logger_handlers_error_handling(self): """Test cleanup_logger_handlers with handler errors.""" @@ -586,8 +586,7 @@ def test_formatter_cache_thread_safety(self): """Test thread safety of formatter cache operations.""" from pythonLogs.memory_utils import get_cached_formatter, clear_formatter_cache import concurrent.futures - import threading - + clear_formatter_cache() errors = [] created_formatters = [] @@ -619,8 +618,7 @@ def formatter_worker(worker_id): def test_weak_reference_cleanup_mechanism(self): """Test weak reference cleanup mechanism without relying on GC timing.""" from pythonLogs.memory_utils import get_active_logger_count, _active_loggers, _weak_ref_lock - import weakref - + # Test the cleanup detection logic in get_active_logger_count with _weak_ref_lock: initial_size = len(_active_loggers) diff --git a/tests/performance/test_performance.py b/tests/performance/test_performance.py index c78b12d..05c9dcf 100644 --- a/tests/performance/test_performance.py +++ b/tests/performance/test_performance.py @@ -8,7 +8,11 @@ # Add parent directory to path for imports -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.insert(0, project_root) # For pythonLogs + +# Import test utilities +from tests.core.test_log_utils import get_safe_timezone from pythonLogs import ( LoggerFactory, @@ -105,11 +109,12 @@ def test_timezone_function_caching(self): # Create multiple loggers with same timezone start_time = time.time() + safe_tz = get_safe_timezone() loggers = [] for i in range(20): logger = basic_logger( name=f"tz_test_{i}", - timezone="UTC" # Same timezone should use cached function + timezone=safe_tz # Same timezone should use cached function ) loggers.append(logger) diff --git a/tests/performance/test_performance_zoneinfo.py b/tests/performance/test_performance_zoneinfo.py index 3d00b0f..8b303d7 100644 --- a/tests/performance/test_performance_zoneinfo.py +++ b/tests/performance/test_performance_zoneinfo.py @@ -26,10 +26,10 @@ def setup_method(self): clear_logger_registry() # Clear timezone caches - from pythonLogs.log_utils import get_timezone_function, _get_timezone_offset, _get_stderr_timezone + from pythonLogs.log_utils import get_timezone_function, get_timezone_offset, get_stderr_timezone get_timezone_function.cache_clear() - _get_timezone_offset.cache_clear() - _get_stderr_timezone.cache_clear() + get_timezone_offset.cache_clear() + get_stderr_timezone.cache_clear() def test_timezone_function_caching_performance(self): """Test that timezone function caching improves performance.""" @@ -53,12 +53,12 @@ def test_timezone_function_caching_performance(self): def test_timezone_offset_caching_performance(self): """Test timezone offset calculation caching performance.""" - from pythonLogs.log_utils import _get_timezone_offset + from pythonLogs.log_utils import get_timezone_offset # Test with multiple calls to the same timezone start_time = time.time() for _ in range(100): - _get_timezone_offset("UTC") # Should be cached after first call + get_timezone_offset("UTC") # Should be cached after first call cached_time = time.time() - start_time # Should complete very quickly due to caching diff --git a/tests/timezone/test_timezone_migration.py b/tests/timezone/test_timezone_migration.py index bae2710..fe372cd 100644 --- a/tests/timezone/test_timezone_migration.py +++ b/tests/timezone/test_timezone_migration.py @@ -3,11 +3,14 @@ import os import sys import tempfile -import pytest # Add parent directory to path for imports -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.insert(0, project_root) # For pythonLogs + +# Import test utilities +from tests.core.test_log_utils import requires_zoneinfo_utc from pythonLogs import ( basic_logger, @@ -21,9 +24,9 @@ ) from pythonLogs.log_utils import ( get_timezone_function, - _get_timezone_offset, + get_timezone_offset, write_stderr, - _get_stderr_timezone, + get_stderr_timezone, ) @@ -34,6 +37,7 @@ def setup_method(self): """Clear registry before each test.""" clear_logger_registry() + @requires_zoneinfo_utc def test_zoneinfo_import_success(self): """Test that ZoneInfo is properly imported.""" from pythonLogs.log_utils import ZoneInfo @@ -42,6 +46,7 @@ def test_zoneinfo_import_success(self): utc_tz = ZoneInfo("UTC") assert utc_tz is not None + @requires_zoneinfo_utc def test_utc_timezone_basic_logger(self): """Test UTC timezone with basic logger.""" logger = basic_logger( @@ -119,21 +124,27 @@ def test_timezone_factory_pattern(self): def test_invalid_timezone_handling(self): """Test handling of invalid timezone names.""" - # Should handle invalid timezone gracefully - with pytest.raises(Exception): # ZoneInfoNotFoundError or similar - basic_logger( - name="invalid_tz_test", - timezone="Invalid/Timezone" - ) + # With the new fallback system, invalid timezones should fall back to localtime + # instead of raising exceptions, making the system more robust + logger = basic_logger( + name="invalid_tz_test", + timezone="Invalid/Timezone" # This should now fall back to localtime + ) + # Logger should be created successfully with fallback + assert logger.name == "invalid_tz_test" + logger.info("Test message with invalid timezone") def test_timezone_offset_calculation(self): """Test timezone offset calculation function.""" - # Test UTC - utc_offset = _get_timezone_offset("UTC") - assert utc_offset == "+0000" + # Test UTC (may fall back to localtime on systems without UTC data) + utc_offset = get_timezone_offset("UTC") + # UTC should return +0000, but may fall back to localtime on Windows + assert isinstance(utc_offset, str) + assert len(utc_offset) == 5 + assert utc_offset[0] in ['+', '-'] # Test localtime - local_offset = _get_timezone_offset("localtime") + local_offset = get_timezone_offset("localtime") assert len(local_offset) == 5 # Format: ±HHMM assert local_offset[0] in ['+', '-'] @@ -150,10 +161,10 @@ def test_timezone_function_caching(self): def test_timezone_function_types(self): """Test different timezone function types.""" - # UTC should return gmtime + # UTC may fall back to localtime on systems without UTC timezone data utc_func = get_timezone_function("UTC") import time - assert utc_func is time.gmtime + assert utc_func in [time.gmtime, time.localtime] # Localtime should return localtime local_func = get_timezone_function("localtime") @@ -186,10 +197,10 @@ def test_stderr_timezone_functionality(self): def test_stderr_timezone_caching(self): """Test that stderr timezone is cached.""" # First call - tz1 = _get_stderr_timezone() + tz1 = get_stderr_timezone() # Second call should return cached result - tz2 = _get_stderr_timezone() + tz2 = get_stderr_timezone() # Should be the same object (cached) assert tz1 is tz2 diff --git a/tests/timezone/test_zoneinfo_fallbacks.py b/tests/timezone/test_zoneinfo_fallbacks.py index 3aa726d..2d9d206 100644 --- a/tests/timezone/test_zoneinfo_fallbacks.py +++ b/tests/timezone/test_zoneinfo_fallbacks.py @@ -8,7 +8,11 @@ # Add parent directory to path for imports -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.insert(0, project_root) # For pythonLogs + +# Import test utilities +from tests.core.test_log_utils import skip_if_no_zoneinfo_utc, get_safe_timezone, requires_zoneinfo_utc class TestZoneinfoFallbacks: @@ -27,30 +31,36 @@ def test_timezone_error_handling(self): """Test proper error handling for timezone operations.""" from pythonLogs import basic_logger, LogLevel - # Test with invalid timezone - with pytest.raises(Exception): # Should raise ZoneInfoNotFoundError or similar - basic_logger( - name="error_test", - timezone="NonExistent/Timezone", - level=LogLevel.INFO - ) + # With the new fallback system, invalid timezones should gracefully fall back + # to localtime instead of raising exceptions for better robustness + logger = basic_logger( + name="error_test", + timezone="NonExistent/Timezone", # Should fall back to localtime + level=LogLevel.INFO + ) + # Logger should be created successfully with fallback + assert logger.name == "error_test" + logger.info("Test message with fallback timezone") def test_timezone_offset_edge_cases(self): """Test timezone offset calculation for edge cases.""" - from pythonLogs.log_utils import _get_timezone_offset + from pythonLogs.log_utils import get_timezone_offset - # Test UTC (should always work) - utc_offset = _get_timezone_offset("UTC") - assert utc_offset == "+0000" + # Test UTC (may fall back to localtime on systems without UTC data) + utc_offset = get_timezone_offset("UTC") + # UTC should return +0000, but may fall back to localtime on Windows + assert isinstance(utc_offset, str) + assert len(utc_offset) == 5 + assert utc_offset[0] in ['+', '-'] # Test localtime (should work on any system) - local_offset = _get_timezone_offset("localtime") + local_offset = get_timezone_offset("localtime") assert isinstance(local_offset, str) assert len(local_offset) == 5 assert local_offset[0] in ['+', '-'] # Test case insensitivity for localtime - local_offset_upper = _get_timezone_offset("LOCALTIME") + local_offset_upper = get_timezone_offset("LOCALTIME") assert local_offset_upper == local_offset def test_stderr_timezone_fallback(self): @@ -74,16 +84,16 @@ def test_timezone_function_fallback(self): from pythonLogs.log_utils import get_timezone_function import time - # Test standard cases + # Test standard cases - UTC may fall back to localtime on systems without UTC data utc_func = get_timezone_function("UTC") - assert utc_func is time.gmtime + assert utc_func in [time.gmtime, time.localtime] local_func = get_timezone_function("localtime") assert local_func is time.localtime - # Test case insensitivity + # Test case insensitivity - UTC may fall back to localtime utc_func_upper = get_timezone_function("utc") - assert utc_func_upper is time.gmtime + assert utc_func_upper in [time.gmtime, time.localtime] local_func_upper = get_timezone_function("LOCALTIME") assert local_func_upper is time.localtime @@ -92,10 +102,11 @@ def test_logger_creation_with_fallback_timezone(self): """Test logger creation when timezone operations might fail.""" from pythonLogs import basic_logger, LogLevel - # These should all work with proper fallback + # Use safe timezone that works on all platforms + safe_tz = get_safe_timezone() logger = basic_logger( name="fallback_test", - timezone="UTC", + timezone=safe_tz, level=LogLevel.INFO ) @@ -133,7 +144,7 @@ def test_complex_timezone_scenarios(self): def test_zoneinfo_caching_behavior(self): """Test that zoneinfo objects are properly cached.""" - from pythonLogs.log_utils import get_timezone_function, _get_timezone_offset + from pythonLogs.log_utils import get_timezone_function, get_timezone_offset # Test function caching func1 = get_timezone_function("America/Chicago") @@ -141,8 +152,8 @@ def test_zoneinfo_caching_behavior(self): assert func1 is func2 # Should be cached # Test offset caching - offset1 = _get_timezone_offset("America/Chicago") - offset2 = _get_timezone_offset("America/Chicago") + offset1 = get_timezone_offset("America/Chicago") + offset2 = get_timezone_offset("America/Chicago") assert offset1 == offset2 # Should be cached def test_environment_variable_timezone_handling(self): @@ -151,12 +162,12 @@ def test_environment_variable_timezone_handling(self): # Test with environment variable with patch.dict(os.environ, {'LOG_TIMEZONE': 'Europe/Paris'}): # Environment variable should be used for stderr - from pythonLogs.log_utils import _get_stderr_timezone + from pythonLogs.log_utils import get_stderr_timezone # Clear cache to test new environment - _get_stderr_timezone.cache_clear() + get_stderr_timezone.cache_clear() - tz = _get_stderr_timezone() + tz = get_stderr_timezone() assert tz is not None def test_concurrent_timezone_access(self): @@ -164,6 +175,8 @@ def test_concurrent_timezone_access(self): import threading from pythonLogs import basic_logger, LogLevel + # Use safe timezone that works on all platforms + safe_tz = get_safe_timezone() results = [] errors = [] @@ -171,7 +184,7 @@ def create_logger_worker(worker_id): try: logger = basic_logger( name=f"concurrent_test_{worker_id}", - timezone="UTC", + timezone=safe_tz, level=LogLevel.INFO ) logger.info(f"Concurrent test message {worker_id}") @@ -194,6 +207,7 @@ def create_logger_worker(worker_id): assert len(errors) == 0, f"Errors occurred: {errors}" assert len(results) == 10 + @requires_zoneinfo_utc def test_memory_usage_with_timezone_caching(self): """Test that timezone caching doesn't cause memory leaks.""" from pythonLogs import basic_logger, clear_logger_registry @@ -211,9 +225,10 @@ def test_memory_usage_with_timezone_caching(self): # Should complete without memory issues - test passes if no exception is raised + @requires_zoneinfo_utc def test_timezone_validation_edge_cases(self): """Test timezone validation for various edge cases.""" - from pythonLogs.log_utils import _get_timezone_offset + from pythonLogs.log_utils import get_timezone_offset # Test case variations (timezone names are case-sensitive except for localtime) test_cases = [ @@ -223,7 +238,7 @@ def test_timezone_validation_edge_cases(self): ] for tz_input, expected in test_cases: - result = _get_timezone_offset(tz_input) + result = get_timezone_offset(tz_input) if expected is not None: assert result == expected else: @@ -232,6 +247,9 @@ def test_timezone_validation_edge_cases(self): assert len(result) == 5 assert result[0] in ['+', '-'] - # Test that invalid timezone names raise appropriate errors - with pytest.raises(Exception): # Should raise ZoneInfoNotFoundError - _get_timezone_offset("invalid_timezone") + # Test that invalid timezone names now fall back gracefully to localtime + result = get_timezone_offset("invalid_timezone") + # Should fall back to localtime format + assert isinstance(result, str) + assert len(result) == 5 + assert result[0] in ['+', '-']