From 08393a48dcf8276803db286b73e2b7bfa09a5848 Mon Sep 17 00:00:00 2001 From: Donchess1 Date: Sun, 2 Mar 2025 13:07:19 +0100 Subject: [PATCH 1/4] soft delete retested getblog adjusted with filter --- api/utils/pagination.py | 7 ++++++- api/v1/routes/blog.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/api/utils/pagination.py b/api/utils/pagination.py index 0fab89c9b..b40eb0a31 100644 --- a/api/utils/pagination.py +++ b/api/utils/pagination.py @@ -69,8 +69,13 @@ def paginated_response( if filters and join is None: # Apply filters for attr, value in filters.items(): + if value is not None: - query = query.filter(getattr(model, attr).like(f"%{value}%")) + column = getattr(model, attr) + if isinstance(value, bool): + query = query.filter(column == value) + else: + query = query.filter(column.like(f"%{value}%")) elif filters and join is not None: # Apply filters diff --git a/api/v1/routes/blog.py b/api/v1/routes/blog.py index e8bd9bb09..20d72bf8e 100644 --- a/api/v1/routes/blog.py +++ b/api/v1/routes/blog.py @@ -224,7 +224,7 @@ async def archive_blog_post( """Endpoint to archive/soft-delete a blog post""" blog_service = BlogService(db=db) - blog_post = blog_service.fetch(blog_id=id) + blog_post = blog_service.fetch(blog_id=blog_id) if not blog_post: raise HTTPException(status_code=404, detail="Post not found") #check if admin/ authorized user From 0cff91a8921b06889e3623cf9382cd3c029369d5 Mon Sep 17 00:00:00 2001 From: Donchess1 Date: Sun, 2 Mar 2025 18:34:36 +0100 Subject: [PATCH 2/4] implement blog restore after soft-delete --- api/utils/pagination.py | 7 +--- api/v1/routes/blog.py | 46 +++++++++++++++++++- tests/v1/blog/test_restore_blog.py | 67 ++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 6 deletions(-) create mode 100644 tests/v1/blog/test_restore_blog.py diff --git a/api/utils/pagination.py b/api/utils/pagination.py index b40eb0a31..c6213aef5 100644 --- a/api/utils/pagination.py +++ b/api/utils/pagination.py @@ -89,10 +89,7 @@ def paginated_response( results = jsonable_encoder(query.offset(skip).limit(limit).all()) total_pages = int(total / limit) + (total % limit > 0) - return success_response( - status_code=200, - message="Successfully fetched items", - data={ + return { "pages": total_pages, "total": total, "skip": skip, @@ -107,4 +104,4 @@ def paginated_response( } ) } - ) + diff --git a/api/v1/routes/blog.py b/api/v1/routes/blog.py index 20d72bf8e..cf8a27066 100644 --- a/api/v1/routes/blog.py +++ b/api/v1/routes/blog.py @@ -52,7 +52,20 @@ def create_blog( def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0): """Endpoint to get all blogs""" - return paginated_response( + blog = paginated_response( + db=db, + model=Blog, + limit=limit, + skip=skip, + ) + + return success_response(200, message="Successfully fetched all blogs", data=blog) + +@blog.get("/active", response_model=success_response) +def get_all_active_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0): + """Endpoint to get all active blogs""" + + blog = paginated_response( db=db, model=Blog, limit=limit, @@ -60,6 +73,7 @@ def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0) filters={"is_deleted": False} #filter out soft-deleted blogs ) + return success_response(200, message="Successfully fetched active blogs", data=blog) @blog.get("/{id}", response_model=BlogPostResponse) def get_blog_by_id(id: str, db: Session = Depends(get_db)): @@ -111,6 +125,8 @@ async def update_blog( ) + + @blog.post("/{blog_id}/like", response_model=BlogLikeDislikeResponse) def like_blog_post( blog_id: str, @@ -241,6 +257,34 @@ async def archive_blog_post( data=jsonable_encoder(blog_post), ) +@blog.put("/{blog_id}/restore") +async def restore_blog_post( + blog_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(user_service.get_current_super_admin), +): + + """Endpoint to restore a soft-deleted blog post""" + + blog_service = BlogService(db=db) + blog_post = blog_service.fetch(blog_id=blog_id) + if not blog_post: + raise HTTPException(status_code=404, detail="Post not found") + #check if admin/ authorized user + if not (blog_post.author_id != current_user.id or current_user.is_superadmin): + raise HTTPException(status_code=403, detail="You don't have permission to perform this action") + if not blog_post.is_deleted: + raise HTTPException(status_code=400, detail="Blog post is already active") + blog_post.is_deleted = False + db.commit() + db.refresh(blog_post) + + return success_response( + message="Blog post restored successfully!", + status_code=200, + data=jsonable_encoder(blog_post), + ) + # Post a comment to a blog diff --git a/tests/v1/blog/test_restore_blog.py b/tests/v1/blog/test_restore_blog.py new file mode 100644 index 000000000..8aeb363f6 --- /dev/null +++ b/tests/v1/blog/test_restore_blog.py @@ -0,0 +1,67 @@ +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session +from uuid_extensions import uuid7 + +from api.db.database import get_db +from api.v1.services.user import user_service +from api.v1.models import User +from api.v1.models.blog import Blog +from main import app + + +def mock_get_db(): + db_session = MagicMock() + yield db_session + + +def mock_get_current_super_admin(): + return User(id="1", is_superadmin=True) + +@pytest.fixture +def db_session_mock(): + db_session = MagicMock(spec=Session) + return db_session + +@pytest.fixture +def client(db_session_mock): + app.dependency_overrides[get_db] = lambda: db_session_mock + client = TestClient(app) + yield client + + +def test_restore_blog_success(client, db_session_mock): + '''Test for success in blog archiving''' + + app.dependency_overrides[get_db] = lambda: db_session_mock + app.dependency_overrides[user_service.get_current_super_admin] = lambda: mock_get_current_super_admin() + blog_id = uuid7() + mock_blog = Blog(id=blog_id, title="Test Blog", + content="Test Content", is_deleted=True) + + db_session_mock.query(Blog).filter(Blog.id==blog_id).first.return_value = mock_blog + + response = client.put(f"/api/v1/blogs/{mock_blog.id}/restore", headers={'Authorization': 'Bearer token'}) + + assert response.status_code == 200 + assert response.json()["message"] == "Blog post restored successfully!" + + +def test_restore_blog_not_found(client, db_session_mock): + '''test for blog not found''' + + db_session_mock.query(Blog).filter(Blog.id == f'{uuid7()}').first.return_value = None + + app.dependency_overrides[get_db] = lambda: db_session_mock + app.dependency_overrides[user_service.get_current_super_admin] = lambda: mock_get_current_super_admin + + response = client.put(f"/api/v1/blogs/{uuid7()}/restore", headers={'Authorization': 'Bearer token'}) + + assert response.status_code == 404 + assert response.json()["message"] == "Post not found" + + +if __name__ == "__main__": + pytest.main() From 8923a6cd2956639dba58f8deaa3fdcee60a7e9c2 Mon Sep 17 00:00:00 2001 From: Donchess1 Date: Sun, 2 Mar 2025 20:08:48 +0100 Subject: [PATCH 3/4] checking out feat/soft_delete --- api/v1/routes/blog.py | 210 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 202 insertions(+), 8 deletions(-) diff --git a/api/v1/routes/blog.py b/api/v1/routes/blog.py index ca475a9be..fd5bc9afd 100644 --- a/api/v1/routes/blog.py +++ b/api/v1/routes/blog.py @@ -1,10 +1,12 @@ from fastapi import ( APIRouter, Depends, HTTPException, status, - HTTPException, Response, Request + HTTPException, Response, Request, Query ) from fastapi.encoders import jsonable_encoder from sqlalchemy.orm import Session -from typing import Annotated +from typing import Annotated, List, Optional +from datetime import datetime +from sqlalchemy import and_, or_, cast, String from api.db.database import get_db from api.utils.pagination import paginated_response @@ -18,7 +20,8 @@ BlogUpdateResponseModel, BlogLikeDislikeResponse, CommentRequest, - CommentUpdateResponseModel + CommentUpdateResponseModel, + BlogSearchResponse ) from api.v1.services.blog import BlogService, BlogDislikeService, BlogLikeService from api.v1.services.user import user_service @@ -57,8 +60,120 @@ def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0) model=Blog, limit=limit, skip=skip, + filters={"is_deleted": False} #filter out soft-deleted blogs ) +# blog search endpoint +@blog.get("/search", response_model=BlogSearchResponse) +def search_blogs( + db: Session = Depends(get_db), + keyword: Optional[str] = Query(None, description="Search in title and content"), + category: Optional[str] = Query(None, description="Filter by blog category"), + author: Optional[str] = Query(None, description="Filter by author name"), + start_date: Optional[str] = Query(None, description="Start date for date range filter (YYYY-MM-DD)"), + end_date: Optional[str] = Query(None, description="End date for date range filter (YYYY-MM-DD)"), + tags: Optional[str] = Query(None, description="Filter by tags (comma-separated)"), + page: int = Query(1, ge=1, description="Page number"), + per_page: int = Query(10, ge=1, le=100, description="Items per page"), +): + """ + Search and filter blogs based on different parameters. + """ + blog_service = BlogService(db) + + # Build the filters + filters = [] + + if keyword: + filters.append(or_( + Blog.title.ilike(f"%{keyword}%"), + Blog.content.ilike(f"%{keyword}%"), + Blog.excerpt.ilike(f"%{keyword}%") + )) + + if category: + # Assuming category might be stored in tags or as a separate field + filters.append(or_( + cast(Blog.tags, String).ilike(f"%{category}%") + )) + + if author: + query = blog_service.db.query(User.id).filter( + or_( + User.first_name.ilike(f"%{author}%"), + User.last_name.ilike(f"%{author}%"), + ) + ).all() + + author_ids = [user_id[0] for user_id in query] + if author_ids: + filters.append(Blog.author_id.in_(author_ids)) + else: + # No matching authors, return empty result + return { + "status_code": 200, + "total_results": 0, + "blogs": [] + } + + # Rest of the function remains the same + if start_date: + try: + start_date_obj = datetime.strptime(start_date, "%Y-%m-%d") + filters.append(Blog.created_at >= start_date_obj) + except ValueError: + raise HTTPException( + status_code=400, + detail="Invalid start_date format. Use YYYY-MM-DD." + ) + + if end_date: + try: + end_date_obj = datetime.strptime(end_date, "%Y-%m-%d") + # Add 1 day to include the end date + end_date_obj = end_date_obj.replace(hour=23, minute=59, second=59) + filters.append(Blog.created_at <= end_date_obj) + except ValueError: + raise HTTPException( + status_code=400, + detail="Invalid end_date format. Use YYYY-MM-DD." + ) + + if tags: + tag_list = [tag.strip() for tag in tags.split(",")] + for tag in tag_list: + filters.append(cast(Blog.tags, str).ilike(f"%{tag}%")) + + # Get total count and paginated results + search_results = blog_service.search_blogs( + filters=filters, + page=page, + per_page=per_page + ) + + # Fix the tags format in the returned blogs + processed_blogs = [] + for blog in search_results["items"]: + blog_dict = blog + # Convert PostgreSQL array format to Python list + if "tags" in blog_dict and blog_dict["tags"]: + # Check if tags is in PostgreSQL array format + if isinstance(blog_dict["tags"], str) and blog_dict["tags"].startswith('{') and blog_dict["tags"].endswith('}'): + # Remove curly braces and split by commas + tags_str = blog_dict["tags"][1:-1] + # Simple split for basic cases + import re + # This regex handles both quoted and unquoted elements in the array + tags_list = re.findall(r'"([^"]*)"|\s*([^,]+)', tags_str) + # Extract the matched groups and clean them + blog_dict["tags"] = [t[0] or t[1].strip() for t in tags_list if t[0] or t[1].strip()] + processed_blogs.append(blog_dict) + + return { + "status_code": 200, + "total_results": search_results["total"], + "blogs": processed_blogs + } @blog.get("/{id}", response_model=BlogPostResponse) def get_blog_by_id(id: str, db: Session = Depends(get_db)): @@ -77,7 +192,8 @@ def get_blog_by_id(id: str, db: Session = Depends(get_db)): """ blog_service = BlogService(db) - blog_post = blog_service.fetch(id) + # Fetch blog and increment view count + blog_post = blog_service.fetch_and_increment_view(id) return success_response( message="Blog post retrieved successfully!", @@ -202,6 +318,32 @@ def dislike_blog_post( ) +@blog.get("/{post_id}/likes-dislikes") +def get_likes_dislikes_count( + post_id: str, + db: Session = Depends(get_db), +): + """Fetch total number of likes and dislikes for a blog post.""" + blog_service = BlogService(db) + + # Validate if blog post exists + blog_service.fetch(post_id) + + # Fetch like and dislike counts + likes_count = blog_service.num_of_likes(post_id) + dislikes_count = blog_service.num_of_dislikes(post_id) + + return success_response( + status_code=status.HTTP_200_OK, + message="Likes and dislikes retrieved successfully", + data={ + "post_id": post_id, + "likes": likes_count, + "dislikes": dislikes_count, + } + ) + + @blog.delete("/{id}", status_code=204) async def delete_blog_post( id: str, @@ -213,6 +355,34 @@ async def delete_blog_post( blog_service = BlogService(db=db) blog_service.delete(blog_id=id) +@blog.put("/{blog_id}/soft_delete") +async def archive_blog_post( + blog_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(user_service.get_current_super_admin), +): + + """Endpoint to archive/soft-delete a blog post""" + + blog_service = BlogService(db=db) + blog_post = blog_service.fetch(blog_id=id) + if not blog_post: + raise HTTPException(status_code=404, detail="Post not found") + #check if admin/ authorized user + if not (blog_post.author_id != current_user.id or current_user.is_superadmin): + raise HTTPException(status_code=403, detail="You don't have permission to perform this action") + + blog_post.is_deleted = True + db.commit() + db.refresh(blog_post) + + return success_response( + message="Blog post archived successfully!", + status_code=200, + data=jsonable_encoder(blog_post), + ) + + # Post a comment to a blog @blog.post("/{blog_id}/comments", response_model=CommentSuccessResponse) @@ -323,11 +493,35 @@ async def delete_blog_like( request: `default` Request. db: `default` Session. """ + + # Validate User + if not current_user: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials") + blog_like_service = BlogLikeService(db) - - # delete blog like - return blog_like_service.delete(blog_like_id, current_user.id) - + + blog_like = blog_like_service.fetch(blog_like_id) + + # Check if blogLike exist + if not blog_like: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="BlogLike does not exist" + ) + + # Check if current user is the owner of blogLike + if blog_like.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Insufficient permission" + ) + + db.delete(blog_like) + db.commit() + + return Response( + status_code=204 + ) @blog.delete("/dislikes/{blog_dislike_id}", status_code=status.HTTP_204_NO_CONTENT) From 0bc7047f81d9aa7402eae8d1ffe44efff6ce3e7d Mon Sep 17 00:00:00 2001 From: Donchess1 Date: Sun, 2 Mar 2025 22:36:55 +0100 Subject: [PATCH 4/4] soft_delete/revert and overall blog check --- api/v1/routes/blog.py | 163 +++--------------------- tests/v1/blog/get_all_blogs_test.py | 53 ++------ tests/v1/blog/get_archive_blogs_test.py | 100 +++++++++++++++ 3 files changed, 127 insertions(+), 189 deletions(-) create mode 100644 tests/v1/blog/get_archive_blogs_test.py diff --git a/api/v1/routes/blog.py b/api/v1/routes/blog.py index 64457237c..5a875d70b 100644 --- a/api/v1/routes/blog.py +++ b/api/v1/routes/blog.py @@ -42,7 +42,7 @@ def create_blog( if not current_user: raise HTTPException(status_code=401, detail="You are not Authorized") blog_service = BlogService(db) - new_blogpost = blog_service.create(db=db, schema=blog, author_id=current_user.id) + new_blogpost = blog_service.create(schema=blog, author_id=current_user.id) return success_response( message="Blog created successfully!", @@ -60,125 +60,11 @@ def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0) model=Blog, limit=limit, skip=skip, - filters={"is_deleted": False} #filter out soft-deleted blogs - ) - -# blog search endpoint -@blog.get("/search", response_model=BlogSearchResponse) -def search_blogs( - db: Session = Depends(get_db), - keyword: Optional[str] = Query(None, description="Search in title and content"), - category: Optional[str] = Query(None, description="Filter by blog category"), - author: Optional[str] = Query(None, description="Filter by author name"), - start_date: Optional[str] = Query(None, description="Start date for date range filter (YYYY-MM-DD)"), - end_date: Optional[str] = Query(None, description="End date for date range filter (YYYY-MM-DD)"), - tags: Optional[str] = Query(None, description="Filter by tags (comma-separated)"), - page: int = Query(1, ge=1, description="Page number"), - per_page: int = Query(10, ge=1, le=100, description="Items per page"), -): - """ - Search and filter blogs based on different parameters. - """ - blog_service = BlogService(db) - - # Build the filters - filters = [] - - if keyword: - filters.append(or_( - Blog.title.ilike(f"%{keyword}%"), - Blog.content.ilike(f"%{keyword}%"), - Blog.excerpt.ilike(f"%{keyword}%") - )) - - if category: - # Assuming category might be stored in tags or as a separate field - filters.append(or_( - cast(Blog.tags, String).ilike(f"%{category}%") - )) - - if author: - query = blog_service.db.query(User.id).filter( - or_( - User.first_name.ilike(f"%{author}%"), - User.last_name.ilike(f"%{author}%"), - ) - ).all() - - author_ids = [user_id[0] for user_id in query] - if author_ids: - filters.append(Blog.author_id.in_(author_ids)) - else: - # No matching authors, return empty result - return { - "status_code": 200, - "total_results": 0, - "blogs": [] - } - - # Rest of the function remains the same - if start_date: - try: - start_date_obj = datetime.strptime(start_date, "%Y-%m-%d") - filters.append(Blog.created_at >= start_date_obj) - except ValueError: - raise HTTPException( - status_code=400, - detail="Invalid start_date format. Use YYYY-MM-DD." - ) - - if end_date: - try: - end_date_obj = datetime.strptime(end_date, "%Y-%m-%d") - # Add 1 day to include the end date - end_date_obj = end_date_obj.replace(hour=23, minute=59, second=59) - filters.append(Blog.created_at <= end_date_obj) - except ValueError: - raise HTTPException( - status_code=400, - detail="Invalid end_date format. Use YYYY-MM-DD." - ) - - if tags: - tag_list = [tag.strip() for tag in tags.split(",")] - for tag in tag_list: - filters.append(cast(Blog.tags, str).ilike(f"%{tag}%")) - - # Get total count and paginated results - search_results = blog_service.search_blogs( - filters=filters, - page=page, - per_page=per_page ) - - # Fix the tags format in the returned blogs - processed_blogs = [] - for blog in search_results["items"]: - blog_dict = blog - # Convert PostgreSQL array format to Python list - if "tags" in blog_dict and blog_dict["tags"]: - # Check if tags is in PostgreSQL array format - if isinstance(blog_dict["tags"], str) and blog_dict["tags"].startswith('{') and blog_dict["tags"].endswith('}'): - # Remove curly braces and split by commas - tags_str = blog_dict["tags"][1:-1] - # Simple split for basic cases - import re - # This regex handles both quoted and unquoted elements in the array - tags_list = re.findall(r'"([^"]*)"|\s*([^,]+)', tags_str) - # Extract the matched groups and clean them - blog_dict["tags"] = [t[0] or t[1].strip() for t in tags_list if t[0] or t[1].strip()] - processed_blogs.append(blog_dict) - - return { - "status_code": 200, - "total_results": search_results["total"], - "blogs": processed_blogs - } return success_response(200, message="Successfully fetched all blogs", data=blog) @blog.get("/active", response_model=success_response) def get_all_active_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0): - """Endpoint to get all active blogs""" blog = paginated_response( db=db, @@ -189,6 +75,20 @@ def get_all_active_blogs(db: Session = Depends(get_db), limit: int = 10, skip: i ) return success_response(200, message="Successfully fetched active blogs", data=blog) + +@blog.get("/archive", response_model=success_response) +def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0): + + blog = paginated_response( + db=db, + model=Blog, + limit=limit, + skip=skip, + filters={"is_deleted": True} + ) + return success_response(200, message="Successfully fetched all archived blogs", data=blog) + + # blog search endpoint @blog.get("/search", response_model=BlogSearchResponse) def search_blogs( @@ -300,6 +200,8 @@ def search_blogs( "total_results": search_results["total"], "blogs": processed_blogs } + return success_response(200, message="Successfully fetched all blogs", data=blog) + @blog.get("/{id}", response_model=BlogPostResponse) def get_blog_by_id(id: str, db: Session = Depends(get_db)): @@ -352,8 +254,6 @@ async def update_blog( ) - - @blog.post("/{blog_id}/like", response_model=BlogLikeDislikeResponse) def like_blog_post( blog_id: str, @@ -489,35 +389,6 @@ async def archive_blog_post( db: Session = Depends(get_db), current_user: User = Depends(user_service.get_current_super_admin), ): - - """Endpoint to archive/soft-delete a blog post""" - - blog_service = BlogService(db=db) - blog_post = blog_service.fetch(blog_id=id) - if not blog_post: - raise HTTPException(status_code=404, detail="Post not found") - #check if admin/ authorized user - if not (blog_post.author_id != current_user.id or current_user.is_superadmin): - raise HTTPException(status_code=403, detail="You don't have permission to perform this action") - - blog_post.is_deleted = True - db.commit() - db.refresh(blog_post) - - return success_response( - message="Blog post archived successfully!", - status_code=200, - data=jsonable_encoder(blog_post), - ) - - -@blog.put("/{blog_id}/soft_delete") -async def archive_blog_post( - blog_id: str, - db: Session = Depends(get_db), - current_user: User = Depends(user_service.get_current_super_admin), -): - """Endpoint to archive/soft-delete a blog post""" blog_service = BlogService(db=db) diff --git a/tests/v1/blog/get_all_blogs_test.py b/tests/v1/blog/get_all_blogs_test.py index 36bd414c2..a4d3c33d3 100644 --- a/tests/v1/blog/get_all_blogs_test.py +++ b/tests/v1/blog/get_all_blogs_test.py @@ -1,22 +1,14 @@ -from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock - import pytest from fastapi.testclient import TestClient from sqlalchemy.orm import Session -from uuid_extensions import uuid7 - -from api.v1.models.blog import Blog from api.v1.routes.blog import get_db - from main import app - # Mock database dependency @pytest.fixture def db_session_mock(): - db_session = MagicMock(spec=Session) - return db_session + return MagicMock(spec=Session) @pytest.fixture def client(db_session_mock): @@ -26,60 +18,35 @@ def client(db_session_mock): app.dependency_overrides = {} def test_get_all_blogs_empty(client, db_session_mock): - # Mock data - mock_blog_data = [] - mock_query = MagicMock() mock_query.count.return_value = 0 mock_query.all.return_value = [] - + + mock_query.filter.return_value = mock_query + mock_query.offset.return_value = mock_query + mock_query.limit.return_value = mock_query db_session_mock.query.return_value = mock_query - db_session_mock.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = mock_blog_data - - # Call the endpoint response = client.get("/api/v1/blogs") - # Assert the response assert response.status_code == 200 assert response.json()["data"]["items"] == [] def test_get_all_blogs_with_data(client, db_session_mock): - blog_id = str(uuid7()) - author_id = str(uuid7()) - timezone_offset = -8.0 - tzinfo = timezone(timedelta(hours=timezone_offset)) - timeinfo = datetime.now(tzinfo) - created_at = timeinfo - updated_at = timeinfo - - # Mock data mock_blog_data = [ - Blog( - id=blog_id, - author_id=author_id, - title="Test Blog", - content="Test Content", - image_url="http://example.com/image.png", - tags=["test", "blog"], - is_deleted=False, - excerpt="Test Excerpt", - created_at=created_at, - updated_at=updated_at - ) + {"id": "123", "title": "Test Blog", "content": "Test Content"} ] mock_query = MagicMock() mock_query.count.return_value = 1 - db_session_mock.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = mock_blog_data mock_query.all.return_value = mock_blog_data + mock_query.filter.return_value = mock_query + mock_query.offset.return_value = mock_query + mock_query.limit.return_value = mock_query db_session_mock.query.return_value = mock_query - # Call the endpoint response = client.get("/api/v1/blogs") - # Assert the response assert response.status_code == 200 - assert len(response.json().get('data')) >= 1 - + assert len(response.json()["data"]["items"]) >= 1 \ No newline at end of file diff --git a/tests/v1/blog/get_archive_blogs_test.py b/tests/v1/blog/get_archive_blogs_test.py new file mode 100644 index 000000000..25400935f --- /dev/null +++ b/tests/v1/blog/get_archive_blogs_test.py @@ -0,0 +1,100 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session +from uuid_extensions import uuid7 + +from api.v1.models.blog import Blog +from api.v1.routes.blog import get_db + +from main import app +@pytest.fixture +def mock_db_session(): + mock_db = MagicMock(spec=Session) + return mock_db + +@pytest.fixture +def client(db_session_mock): + app.dependency_overrides[get_db] = lambda: db_session_mock + client = TestClient(app) + yield client + app.dependency_overrides = {} + + +def test_get_archive_blogs(mock_db_session, monkeypatch): + def override_get_db(): + return mock_db_session + + monkeypatch.setattr("api.v1.routes.blog.get_db", override_get_db) + + mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = [] + + response = client.get("/blog/archive?limit=5&skip=0") + + + assert response.status_code == 200 + json_data = response.json() + assert json_data["status"] == "success" + assert json_data["message"] == "Successfully fetched all archived blogs" + assert isinstance(json_data["data"], list) + + +# def test_get_all_active_blogs(mock_db_session): +# response = client.get("/api/v1/blogs/active") +# assert response.status_code == 200 +# assert response.json()["message"] == "Successfully fetched active blogs" +# assert isinstance(response.json()["data"], list) + + + +@pytest.fixture +def mock_db_session(): + return MagicMock(spec=Session) + +@pytest.fixture +def client(mock_db_session): + app.dependency_overrides[get_db] = lambda: mock_db_session + client = TestClient(app) + yield client + app.dependency_overrides = {} + +def test_get_archive_blogs(client, mock_db_session): + mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = [] + + response = client.get("/api/v1/blogs/archive?limit=5&skip=0") + + assert response.status_code == 200 + json_data = response.json() + assert json_data["status"] == "success" + assert json_data["message"] == "Successfully fetched all archived blogs" + assert isinstance(json_data["data"]["items"], list) + + +from unittest.mock import MagicMock +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session +from api.v1.routes.blog import get_db +from main import app + +@pytest.fixture +def mock_db_session(): + return MagicMock(spec=Session) + +@pytest.fixture +def client(mock_db_session): + app.dependency_overrides[get_db] = lambda: mock_db_session + client = TestClient(app) + yield client + app.dependency_overrides = {} + +def test_get_all_active_blogs(client, mock_db_session): + mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = [] + + response = client.get("/api/v1/blogs/active") + + assert response.status_code == 200 + assert response.json()["message"] == "Successfully fetched active blogs" + assert isinstance(response.json()["data"]["items"], list) \ No newline at end of file