375 lines
12 KiB
Python
375 lines
12 KiB
Python
"""
|
|
Unit tests for StorageService (async wrapper for storage backends).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
from cpv3.infrastructure.storage.base import StorageService
|
|
from cpv3.infrastructure.storage.types import FileInfo
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_backend() -> MagicMock:
|
|
"""Create a mock storage backend."""
|
|
backend = MagicMock()
|
|
backend.upload_fileobj = MagicMock()
|
|
backend.download_fileobj = MagicMock()
|
|
backend.exists = MagicMock(return_value=True)
|
|
backend.size = MagicMock(return_value=1024)
|
|
backend.delete = MagicMock()
|
|
backend.read = MagicMock(return_value=b"file content")
|
|
backend.generate_url = MagicMock(return_value="https://example.com/file.txt")
|
|
return backend
|
|
|
|
|
|
@pytest.fixture
|
|
def storage_service(mock_backend: MagicMock) -> StorageService:
|
|
"""Create a StorageService with mocked backend."""
|
|
return StorageService(mock_backend)
|
|
|
|
|
|
class TestStorageServiceUpload:
|
|
"""Tests for StorageService upload functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_fileobj_with_generated_name(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test uploading a file with auto-generated name."""
|
|
fileobj = io.BytesIO(b"test content")
|
|
|
|
key = await storage_service.upload_fileobj(
|
|
fileobj=fileobj,
|
|
file_name="original.txt",
|
|
folder="uploads",
|
|
gen_name=True,
|
|
content_type="text/plain",
|
|
)
|
|
|
|
# Key should contain folder and have .txt extension
|
|
assert key.startswith("uploads/")
|
|
assert key.endswith(".txt")
|
|
# Key should not contain original filename
|
|
assert "original" not in key
|
|
|
|
mock_backend.upload_fileobj.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_fileobj_with_original_name(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test uploading a file keeping original name."""
|
|
fileobj = io.BytesIO(b"test content")
|
|
|
|
key = await storage_service.upload_fileobj(
|
|
fileobj=fileobj,
|
|
file_name="myfile.txt",
|
|
folder="uploads",
|
|
gen_name=False,
|
|
content_type="text/plain",
|
|
)
|
|
|
|
assert key == "uploads/myfile.txt"
|
|
mock_backend.upload_fileobj.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_fileobj_without_folder(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test uploading a file without folder."""
|
|
fileobj = io.BytesIO(b"test content")
|
|
|
|
key = await storage_service.upload_fileobj(
|
|
fileobj=fileobj,
|
|
file_name="myfile.txt",
|
|
folder="",
|
|
gen_name=False,
|
|
)
|
|
|
|
assert key == "myfile.txt"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_fileobj_without_extension(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test uploading a file without extension."""
|
|
fileobj = io.BytesIO(b"test content")
|
|
|
|
key = await storage_service.upload_fileobj(
|
|
fileobj=fileobj,
|
|
file_name="noextension",
|
|
folder="uploads",
|
|
gen_name=True,
|
|
)
|
|
|
|
# Should generate UUID without extension
|
|
assert key.startswith("uploads/")
|
|
assert "." not in key.split("/")[-1] # No extension in filename
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_fileobj_seeks_to_start(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test that upload_fileobj seeks to start of file."""
|
|
fileobj = io.BytesIO(b"test content")
|
|
fileobj.seek(5) # Move position
|
|
|
|
await storage_service.upload_fileobj(
|
|
fileobj=fileobj,
|
|
file_name="test.txt",
|
|
folder="",
|
|
gen_name=False,
|
|
)
|
|
|
|
# Backend should receive fileobj that's been seeked to 0
|
|
mock_backend.upload_fileobj.assert_called_once()
|
|
# The call should have seeked the fileobj
|
|
assert fileobj.tell() == 0 or mock_backend.upload_fileobj.called
|
|
|
|
|
|
class TestStorageServiceExists:
|
|
"""Tests for StorageService exists functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exists_returns_true(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test exists returns True when file exists."""
|
|
mock_backend.exists.return_value = True
|
|
|
|
result = await storage_service.exists("test/file.txt")
|
|
|
|
assert result is True
|
|
mock_backend.exists.assert_called_once_with("test/file.txt")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exists_returns_false(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test exists returns False when file doesn't exist."""
|
|
mock_backend.exists.return_value = False
|
|
|
|
result = await storage_service.exists("nonexistent.txt")
|
|
|
|
assert result is False
|
|
|
|
|
|
class TestStorageServiceDelete:
|
|
"""Tests for StorageService delete functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_calls_backend(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test delete calls backend delete."""
|
|
await storage_service.delete("test/file.txt")
|
|
|
|
mock_backend.delete.assert_called_once_with("test/file.txt")
|
|
|
|
|
|
class TestStorageServiceSize:
|
|
"""Tests for StorageService size functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_size_returns_file_size(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test size returns file size from backend."""
|
|
mock_backend.size.return_value = 12345
|
|
|
|
result = await storage_service.size("test/file.txt")
|
|
|
|
assert result == 12345
|
|
mock_backend.size.assert_called_once_with("test/file.txt")
|
|
|
|
|
|
class TestStorageServiceRead:
|
|
"""Tests for StorageService read functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_returns_content(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test read returns file content from backend."""
|
|
expected_content = b"file content bytes"
|
|
mock_backend.read.return_value = expected_content
|
|
|
|
result = await storage_service.read("test/file.txt")
|
|
|
|
assert result == expected_content
|
|
mock_backend.read.assert_called_once_with("test/file.txt")
|
|
|
|
|
|
class TestStorageServiceUrl:
|
|
"""Tests for StorageService url functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_url_returns_presigned_url(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test url returns presigned URL from backend."""
|
|
expected_url = "https://s3.example.com/bucket/file.txt?signature=xyz"
|
|
mock_backend.generate_url.return_value = expected_url
|
|
|
|
result = await storage_service.url("test/file.txt")
|
|
|
|
assert result == expected_url
|
|
mock_backend.generate_url.assert_called_once_with("test/file.txt")
|
|
|
|
|
|
class TestStorageServiceGetFileInfo:
|
|
"""Tests for StorageService get_file_info functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_file_info_returns_file_info(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test get_file_info returns FileInfo with all details."""
|
|
mock_backend.exists.return_value = True
|
|
mock_backend.generate_url.return_value = "https://example.com/uploads/file.txt"
|
|
mock_backend.size.return_value = 2048
|
|
|
|
result = await storage_service.get_file_info("uploads/file.txt")
|
|
|
|
assert isinstance(result, FileInfo)
|
|
assert result.file_path == "uploads/file.txt"
|
|
assert result.file_url == "https://example.com/uploads/file.txt"
|
|
assert result.file_size == 2048
|
|
assert result.filename == "file.txt"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_file_info_raises_when_not_found(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test get_file_info raises FileNotFoundError when file doesn't exist."""
|
|
mock_backend.exists.return_value = False
|
|
|
|
with pytest.raises(FileNotFoundError) as exc_info:
|
|
await storage_service.get_file_info("nonexistent.txt")
|
|
|
|
assert "nonexistent.txt" in str(exc_info.value)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_file_info_extracts_filename_from_path(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test get_file_info correctly extracts filename from path."""
|
|
mock_backend.exists.return_value = True
|
|
mock_backend.generate_url.return_value = "https://example.com/path"
|
|
mock_backend.size.return_value = 100
|
|
|
|
result = await storage_service.get_file_info("deep/nested/path/myfile.mp4")
|
|
|
|
assert result.filename == "myfile.mp4"
|
|
|
|
|
|
class TestStorageServiceIntegration:
|
|
"""Integration-style tests for StorageService."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_then_get_info(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test uploading a file and getting its info."""
|
|
mock_backend.exists.return_value = True
|
|
mock_backend.size.return_value = 1024
|
|
mock_backend.generate_url.return_value = "https://example.com/url"
|
|
|
|
# Upload
|
|
fileobj = io.BytesIO(b"test content")
|
|
key = await storage_service.upload_fileobj(
|
|
fileobj=fileobj,
|
|
file_name="video.mp4",
|
|
folder="media",
|
|
gen_name=False,
|
|
content_type="video/mp4",
|
|
)
|
|
|
|
# Get info
|
|
info = await storage_service.get_file_info(key)
|
|
|
|
assert info.file_path == key
|
|
assert info.file_size == 1024
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_workflow(
|
|
self, storage_service: StorageService, mock_backend: MagicMock
|
|
):
|
|
"""Test full file management workflow."""
|
|
mock_backend.exists.return_value = True
|
|
mock_backend.size.return_value = 500
|
|
mock_backend.read.return_value = b"content"
|
|
mock_backend.generate_url.return_value = "https://url"
|
|
|
|
# Upload
|
|
fileobj = io.BytesIO(b"content")
|
|
key = await storage_service.upload_fileobj(
|
|
fileobj=fileobj,
|
|
file_name="doc.pdf",
|
|
folder="documents",
|
|
gen_name=False,
|
|
)
|
|
|
|
# Check exists
|
|
exists = await storage_service.exists(key)
|
|
assert exists is True
|
|
|
|
# Get size
|
|
size = await storage_service.size(key)
|
|
assert size == 500
|
|
|
|
# Read content
|
|
content = await storage_service.read(key)
|
|
assert content == b"content"
|
|
|
|
# Get URL
|
|
url = await storage_service.url(key)
|
|
assert url == "https://url"
|
|
|
|
# Delete
|
|
await storage_service.delete(key)
|
|
mock_backend.delete.assert_called_once()
|
|
|
|
|
|
class TestFileInfoDataclass:
|
|
"""Tests for FileInfo dataclass."""
|
|
|
|
def test_file_info_creation(self):
|
|
"""Test creating FileInfo with all fields."""
|
|
info = FileInfo(
|
|
file_path="path/to/file.txt",
|
|
file_url="https://example.com/file.txt",
|
|
file_size=1024,
|
|
filename="file.txt",
|
|
)
|
|
|
|
assert info.file_path == "path/to/file.txt"
|
|
assert info.file_url == "https://example.com/file.txt"
|
|
assert info.file_size == 1024
|
|
assert info.filename == "file.txt"
|
|
|
|
def test_file_info_optional_fields(self):
|
|
"""Test creating FileInfo with optional fields as None."""
|
|
info = FileInfo(
|
|
file_path="path/to/file.txt",
|
|
file_url="https://example.com/file.txt",
|
|
)
|
|
|
|
assert info.file_size is None
|
|
assert info.filename is None
|
|
|
|
def test_file_info_is_frozen(self):
|
|
"""Test that FileInfo is immutable."""
|
|
info = FileInfo(
|
|
file_path="path/to/file.txt",
|
|
file_url="https://example.com/file.txt",
|
|
)
|
|
|
|
with pytest.raises(AttributeError):
|
|
info.file_path = "new/path" # type: ignore
|