Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions docs/guides/storage_clients.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Crawlee provides three main storage client implementations:

- <ApiLink to="class/FileSystemStorageClient">`FileSystemStorageClient`</ApiLink> - Provides persistent file system storage with in-memory caching.
- <ApiLink to="class/MemoryStorageClient">`MemoryStorageClient`</ApiLink> - Stores data in memory with no persistence.
- <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> - Provides persistent storage using a SQL database ([SQLite](https://sqlite.org/) or [PostgreSQL](https://www.postgresql.org/)). Requires installing the extra dependency: `crawlee[sql_sqlite]` for SQLite or `crawlee[sql_postgres]` for PostgreSQL.
- <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> - Provides persistent storage using a SQL database ([SQLite](https://sqlite.org/), [PostgreSQL](https://www.postgresql.org/), [MySQL](https://www.mysql.com/) or [MariaDB](https://mariadb.org/)). Requires installing the extra dependency: `crawlee[sql_sqlite]` for SQLite, `crawlee[sql_postgres]` for PostgreSQL or `crawlee[sql_mysql]` for MySQL and MariaDB.
- <ApiLink to="class/RedisStorageClient">`RedisStorageClient`</ApiLink> - Provides persistent storage using a [Redis](https://redis.io/) database v8.0+. Requires installing the extra dependency `crawlee[redis]`.
- [`ApifyStorageClient`](https://docs.apify.com/sdk/python/reference/class/ApifyStorageClient) - Manages storage on the [Apify platform](https://apify.com), implemented in the [Apify SDK](https://github.com/apify/apify-sdk-python).

Expand Down Expand Up @@ -144,7 +144,7 @@ The `MemoryStorageClient` does not persist data between runs. All data is lost w
The `SqlStorageClient` is experimental. Its API and behavior may change in future releases.
:::

The <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> provides persistent storage using a SQL database (SQLite by default, or PostgreSQL). It supports all Crawlee storage types and enables concurrent access from multiple independent clients or processes.
The <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> provides persistent storage using a SQL database (SQLite by default, or PostgreSQL, MySQL, MariaDB). It supports all Crawlee storage types and enables concurrent access from multiple independent clients or processes.

:::note dependencies
The <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> is not included in the core Crawlee package.
Expand All @@ -154,10 +154,12 @@ To use it, you need to install Crawlee with the appropriate extra dependency:
<code>pip install 'crawlee[sql_sqlite]'</code>
- For PostgreSQL support, run:
<code>pip install 'crawlee[sql_postgres]'</code>
- For MySQL or MariaDB support, run:
<code>pip install 'crawlee[sql_mysql]'</code>
:::

By default, <ApiLink to="class/SqlStorageClient">SqlStorageClient</ApiLink> uses SQLite.
To use PostgreSQL instead, just provide a PostgreSQL connection string via the `connection_string` parameter. No other code changes are needed—the same client works for both databases.
To use a different database, just provide the appropriate connection string via the `connection_string` parameter. No other code changes are needed—the same client works for all supported databases.

<RunnableCodeBlock className="language-python" language="python">
{SQLStorageClientBasicExample}
Expand Down Expand Up @@ -214,7 +216,6 @@ class dataset_metadata_buffer {
+ id (PK)
+ accessed_at
+ modified_at
+ dataset_id (FK)
+ delta_item_count
}

Expand Down Expand Up @@ -247,7 +248,6 @@ class key_value_store_metadata_buffer {
+ id (PK)
+ accessed_at
+ modified_at
+ key_value_store_id (FK)
}

%% ========================
Expand Down Expand Up @@ -321,7 +321,6 @@ class request_queue_metadata_buffer {
+ id (PK)
+ accessed_at
+ modified_at
+ request_queue_id (FK)
+ client_id
+ delta_handled_count
+ delta_pending_count
Expand All @@ -346,11 +345,15 @@ Configuration options for the <ApiLink to="class/SqlStorageClient">`SqlStorageCl

Configuration options for the <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> can be set via constructor arguments:

- **`connection_string`** (default: SQLite in <ApiLink to="class/Configuration">`Configuration`</ApiLink> storage dir) - SQLAlchemy connection string, e.g. `sqlite+aiosqlite:///my.db` or `postgresql+asyncpg://user:pass@host/db`.
- **`connection_string`** (default: SQLite in <ApiLink to="class/Configuration">`Configuration`</ApiLink> storage dir) - SQLAlchemy connection string, e.g. `sqlite+aiosqlite:///my.db`, `postgresql+asyncpg://user:pass@host/db`, `mysql+aiomysql://user:pass@host/db` or `mariadb+aiomysql://user:pass@host/db`.
- **`engine`** - Pre-configured SQLAlchemy AsyncEngine (optional).

For advanced scenarios, you can configure <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> with a custom SQLAlchemy engine and additional options via the <ApiLink to="class/Configuration">`Configuration`</ApiLink> class. This is useful, for example, when connecting to an external PostgreSQL database or customizing connection pooling.

:::warning
If you use MySQL or MariaDB, pass the `isolation_level='READ COMMITTED'` argument to `create_async_engine`. MySQL/MariaDB default to the `REPEATABLE READ` isolation level, which can cause unnecessary locking, deadlocks, or stale reads when multiple Crawlee workers access the same tables concurrently. Using `READ COMMITTED` ensures more predictable row-level locking and visibility semantics for `SqlStorageClient`.
:::

<CodeBlock className="language-python" language="python">
{SQLStorageClientConfigurationExample}
</CodeBlock>
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ sql_sqlite = [
"sqlalchemy[asyncio]>=2.0.0,<3.0.0",
"aiosqlite>=0.21.0",
]
sql_mysql = [
"sqlalchemy[asyncio]>=2.0.0,<3.0.0",
"aiomysql>=0.3.2",
"cryptography>=46.0.5",
]
redis = ["redis[hiredis] >= 7.0.0"]

[project.scripts]
Expand Down
26 changes: 23 additions & 3 deletions src/crawlee/storage_clients/_sql/_client_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

from sqlalchemy import CursorResult, delete, select, text, update
from sqlalchemy import func as sql_func
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as lite_insert
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.exc import OperationalError, SQLAlchemyError

from crawlee._utils.crypto import crypto_random_object_id

Expand Down Expand Up @@ -227,6 +228,9 @@ def _build_insert_stmt_with_ignore(
if dialect == 'sqlite':
return lite_insert(table_model).values(insert_values).on_conflict_do_nothing()

if dialect in {'mysql', 'mariadb'}:
return mysql_insert(table_model).values(insert_values).prefix_with('IGNORE')

raise NotImplementedError(f'Insert with ignore not supported for dialect: {dialect}')

def _build_upsert_stmt(
Expand Down Expand Up @@ -260,6 +264,11 @@ def _build_upsert_stmt(
set_ = {col: getattr(lite_stmt.excluded, col) for col in update_columns}
return lite_stmt.on_conflict_do_update(index_elements=conflict_cols, set_=set_)

if dialect in {'mysql', 'mariadb'}:
mysql_stmt = mysql_insert(table_model).values(insert_values)
set_ = {col: getattr(mysql_stmt.inserted, col) for col in update_columns}
return mysql_stmt.on_duplicate_key_update(**set_)

raise NotImplementedError(f'Upsert not supported for dialect: {dialect}')

async def _purge(self, metadata_kwargs: MetadataUpdateParams) -> None:
Expand Down Expand Up @@ -402,11 +411,12 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession) -> bool:
Returns:
True if lock was acquired, False if already locked by another process.
"""
capture_error_code = 1020 # MariaDB error code for "Record has changed since last read"
now = datetime.now(timezone.utc)
lock_until = now + self._BLOCK_BUFFER_TIME
dialect = self._storage_client.get_dialect_name()

if dialect == 'postgresql':
if dialect in {'postgresql', 'mysql', 'mariadb'}:
select_stmt = (
select(self._METADATA_TABLE)
.where(
Expand All @@ -417,7 +427,17 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession) -> bool:
)
.with_for_update(skip_locked=True)
)
result = await session.execute(select_stmt)

try:
result = await session.execute(select_stmt)
except OperationalError as e:
# MariaDB raises error 1020 ("Record has changed since last read") instead of
# silently skipping locked rows like MySQL/PostgreSQL. Treat it as lock not acquired.
error_code = getattr(e.orig, 'args', [None])[0]
if error_code == capture_error_code:
return False
raise

metadata_row = result.scalar_one_or_none()

if metadata_row is None:
Expand Down
50 changes: 25 additions & 25 deletions src/crawlee/storage_clients/_sql/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,33 +335,33 @@ async def add_batch_of_requests(
)
)

if insert_values:
if forefront:
# If the request already exists in the database, we update the sequence_number by shifting request
# to the left.
upsert_stmt = self._build_upsert_stmt(
self._ITEM_TABLE,
insert_values,
update_columns=['sequence_number'],
conflict_cols=['request_id', 'request_queue_id'],
)
result = await session.execute(upsert_stmt)
else:
# If the request already exists in the database, we ignore this request when inserting.
insert_stmt_with_ignore = self._build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values)
result = await session.execute(insert_stmt_with_ignore)
try:
if insert_values:
if forefront:
# If the request already exists in the database, we update the sequence_number
# by shifting request to the left.
upsert_stmt = self._build_upsert_stmt(
self._ITEM_TABLE,
insert_values,
update_columns=['sequence_number'],
conflict_cols=['request_id', 'request_queue_id'],
)
result = await session.execute(upsert_stmt)
else:
# If the request already exists in the database, we ignore this request when inserting.
insert_stmt_with_ignore = self._build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values)
result = await session.execute(insert_stmt_with_ignore)

result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result
approximate_new_request += result.rowcount
result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result
approximate_new_request += result.rowcount

await self._add_buffer_record(
session,
update_modified_at=True,
delta_pending_request_count=approximate_new_request,
delta_total_request_count=approximate_new_request,
)
await self._add_buffer_record(
session,
update_modified_at=True,
delta_pending_request_count=approximate_new_request,
delta_total_request_count=approximate_new_request,
)

try:
await session.commit()
processed_requests.extend(transaction_processed_requests)
except SQLAlchemyError as e:
Expand Down Expand Up @@ -433,7 +433,7 @@ async def fetch_next_request(self) -> Request | None:

async with self.get_session(with_simple_commit=True) as session:
# We use the `skip_locked` database mechanism to prevent the 'interception' of requests by another client
if dialect == 'postgresql':
if dialect in {'postgresql', 'mysql', 'mariadb'}:
stmt = stmt.with_for_update(skip_locked=True)
result = await session.execute(stmt)
requests_db = result.scalars().all()
Expand Down
30 changes: 23 additions & 7 deletions src/crawlee/storage_clients/_sql/_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import warnings
from logging import getLogger
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
Expand Down Expand Up @@ -116,10 +116,10 @@ async def initialize(self, configuration: Configuration) -> None:
async with engine.begin() as conn:
self._dialect_name = engine.dialect.name

if self._dialect_name not in ('sqlite', 'postgresql'):
if self._dialect_name not in {'sqlite', 'postgresql', 'mysql', 'mariadb'}:
raise ValueError(
f'Unsupported database dialect: {self._dialect_name}. Supported: sqlite, postgresql. '
'Consider using a different database.',
f'Unsupported database dialect: {self._dialect_name}. Supported: sqlite, postgresql, mysql, '
'mariadb. Consider using a different database.',
)

# Create tables if they don't exist.
Expand Down Expand Up @@ -256,11 +256,26 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
# Create connection string with path to default database
connection_string = f'sqlite+aiosqlite:///{db_path}'

if 'sqlite' not in connection_string and 'postgresql' not in connection_string:
if (
('sqlite' not in connection_string)
and ('postgresql' not in connection_string)
and ('mysql' not in connection_string)
and ('mariadb' not in connection_string)
):
raise ValueError(
'Unsupported database. Supported: sqlite, postgresql. Consider using a different database.'
'Unsupported database. Supported: sqlite, postgresql, mysql, mariadb. Consider using a different '
'database.'
)

kwargs: dict[str, Any] = {}
if 'mysql' in connection_string or 'mariadb' in connection_string:
connect_args: dict[str, Any] = {'connect_timeout': 30}
# MySQL/MariaDB require READ COMMITTED isolation level for correct behavior in concurrent environments
# without deadlocks.
kwargs['isolation_level'] = 'READ COMMITTED'
else:
connect_args = {'timeout': 30}

self._engine = create_async_engine(
connection_string,
future=True,
Expand All @@ -270,6 +285,7 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
pool_recycle=600,
pool_pre_ping=True,
echo=False,
connect_args={'timeout': 30},
connect_args=connect_args,
**kwargs,
)
return self._engine
Loading
Loading