GEO Architecture Analysis

Analysis of the GEO product migration from Firebase Functions to FastAPI/PostgreSQL, identifying redundancies, extracting core primitives, and planning modular refactoring.

Migration Status: Complete
All 9 GEO APIs successfully migrated to FastAPI/PostgreSQL. Database tables created and tested. Ready for refactoring phase.
APIs Migrated
9
All endpoints operational
Database Tables
21
Across 3 schemas
Redundancies Found
4
Critical areas requiring consolidation
Quick Wins
4
Low effort, high impact changes
Critical Redundancies

Four major areas where duplicate code, dual data models, or parallel systems create unnecessary complexity and maintenance burden.

Redundancy Impact Location Status
Dual Search Systems
Both search.py and mention_scraper.py independently call DataForSEO API with separate implementations
HIGH
geo/api/search.py:245
functions/mention_scraper.py:89
Planned
Schema Split Chaos
geo_saves exists in both public and geo schemas. Foreign keys reference wrong schema.
HIGH
migrations/012_geo_tables.sql:15
migrations/014_geo_mentions.sql:42
Planned
Triple Progress Tracking
Progress tracked in 3 places: mention_scans.status, reports.status, and async_operations table
MEDIUM
migrations/014_geo_mentions.sql:68
migrations/015_geo_remaining_apis.sql:116
Planned
Classification Locked in Mentions
Sentiment/relevance analysis only available to mentions pipeline, but reports could benefit from same analysis
MEDIUM
geo/pipeline/mention_classifier.py:1
Planned
Core Primitives

Four fundamental operations that underpin all GEO features. These should be extracted as standalone services.

Search

Execute query across search engines (Google, ChatGPT, Perplexity, Gemini). Returns structured SERP data with positions, URLs, snippets.

Used by: search.py, mentions, reports
Analysis

Classify search results by sentiment (-1 to 1), relevance score (0-100), authority metrics. Uses GPT-4o for semantic analysis.

Used by: mentions (locked)
Position Tracking

Find where user's domain appears in search results. Track rank changes over time with delta calculations.

Used by: search.py, reports
Progress

Track async operation status with real-time updates. WebSocket notifications for UI feedback during long-running jobs.

Used by: all async operations
Extract search_service.py

Consolidate all DataForSEO calls into a single service. Eliminates duplicate code in search.py and mention_scraper.py.

Impact: High
Reduces API integration surface area from 2 implementations to 1. Makes rate limiting and error handling consistent.
Implementation Plan

Create geo/services/search_service.py with unified interface:

""" GEO Search Service - Unified DataForSEO Integration """ from typing import List, Dict, Any, Optional def execute_search( query: str, engine: str, # google, bing, etc. save_id: int, user_id: int, num_results: int = 10, location_code: Optional[int] = None ) -> Dict[str, Any]: """ Execute search query via DataForSEO API. Returns: { "query_id": int, # Database ID "results": [...], # Formatted SERP data "position": int | None, # User's rank "raw_response": {...} # Full API response } """ # Single DataForSEO implementation # Rate limiting, error handling, retry logic # Position calculation # Database storage pass

Update callers to use new service:

# geo/api/search.py (BEFORE) url = "https://api.dataforseo.com/v3/serp/google/organic/live/advanced" response = requests.post(url, headers=headers, json=post_data) # ... 50 lines of processing # geo/api/search.py (AFTER) from geo.services.search_service import execute_search result = execute_search(query, "google", save_id, user_id)
# geo/pipeline/mention_scraper.py (BEFORE) DATAFORSEO_SEARCH_URL = "https://api.dataforseo.com/..." resp = requests.post(DATAFORSEO_SEARCH_URL, ...) # ... duplicate DataForSEO code # geo/pipeline/mention_scraper.py (AFTER) from geo.services.search_service import execute_search result = execute_search(query_text, "google", scan_id, user_id)
Unify geo_saves Schema

Consolidate public.geo_saves and geo.geo_saves into single schema. Fix all foreign key references.

Decision Required
Choose target schema: geo.geo_saves (recommended) vs public.geo_saves. All GEO tables should live in geo.* namespace.
Migration Strategy
-- Migration: 016_unify_geo_saves.sql BEGIN; -- 1. Copy data from public.geo_saves to geo.geo_saves INSERT INTO geo.geo_saves SELECT * FROM public.geo_saves ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, url = EXCLUDED.url, -- ... all fields updated_at = NOW(); -- 2. Update foreign key references ALTER TABLE public.geo_queries DROP CONSTRAINT IF EXISTS geo_queries_save_id_fkey; ALTER TABLE public.geo_queries ADD CONSTRAINT geo_queries_save_id_fkey FOREIGN KEY (save_id) REFERENCES geo.geo_saves(id) ON DELETE CASCADE; -- 3. Drop old table DROP TABLE public.geo_saves; -- 4. Update sequence SELECT setval('geo.geo_saves_id_seq', (SELECT MAX(id) FROM geo.geo_saves)); COMMIT;

Update all code references:

# Before: Mixed references db.query("SELECT * FROM geo_saves WHERE ...") # public schema db.query("SELECT * FROM geo.geo_saves WHERE ...") # geo schema # After: Consistent db.query("SELECT * FROM geo.geo_saves WHERE ...") # Always geo schema
Centralize Progress Tracking

Remove custom status fields from geo.mention_scans and geo.reports. Use only shared.async_operations for all progress.

Schema Changes
-- Migration: 017_centralize_progress.sql BEGIN; -- 1. Remove redundant status columns ALTER TABLE geo.mention_scans DROP COLUMN IF EXISTS status, DROP COLUMN IF EXISTS current_stage, DROP COLUMN IF EXISTS progress_percentage; ALTER TABLE geo.reports DROP COLUMN IF EXISTS status; -- 2. Keep only completion timestamp -- (status is in async_operations, completion is immutable metadata) ALTER TABLE geo.mention_scans ADD COLUMN completed_at TIMESTAMPTZ; ALTER TABLE geo.reports ADD COLUMN completed_at TIMESTAMPTZ; COMMIT;

Update code to use async_operations:

# Before: Dual writes db.update("geo.mention_scans", scan_id, {"status": "scanning", "progress": 25}) db.update("shared.async_operations", op_id, {"status": "in_progress", "progress": 25}) # After: Single source of truth db.update("shared.async_operations", op_id, { "status": "in_progress", "progress": 25, "current_step": "Scanning web sources..." }) # Frontend queries async_operations only async_op = db.query_one( "SELECT * FROM shared.async_operations WHERE operation_id = %s", (scan_id,) ) return {"status": async_op["status"], "progress": async_op["progress"]}
Extract Classification Service

Make sentiment/relevance analysis standalone. Reports can optionally use classification on their search results.

User Preference
Mentions and keyword AI results are "pretty different" — sentiment analysis should be optional layer that can go on top of any search results.
Service Interface
""" GEO Analysis Service - Sentiment, Relevance, Authority """ from typing import List, Dict, Any def analyze_sentiment( text: str, context: Dict[str, Any] ) -> float: """ Analyze sentiment of text. Returns: -1.0 (very negative) to 1.0 (very positive) """ # GPT-4o analysis pass def score_relevance( result: Dict[str, Any], profile: Dict[str, Any] ) -> int: """ Score how relevant a search result is to user's business. Returns: 0-100 relevance score """ pass def calculate_authority( domain: str, result_metadata: Dict[str, Any] ) -> int: """ Calculate authority score of source domain. Returns: 0-100 authority score """ pass def analyze_search_results( results: List[Dict[str, Any]], profile: Dict[str, Any], include_sentiment: bool = True, include_relevance: bool = True, include_authority: bool = True ) -> List[Dict[str, Any]]: """ Batch analyze search results with optional features. Returns: Results enriched with sentiment, relevance, authority scores """ pass

Usage patterns:

# Mentions pipeline (always uses classification) from geo.services.analysis_service import analyze_search_results enriched_results = analyze_search_results( results=raw_search_results, profile=mention_profile, include_sentiment=True, include_relevance=True, include_authority=True ) # Reports (optional classification) if user_settings.get("enable_classification"): enriched_results = analyze_search_results( results=report_search_results, profile=report_profile, include_sentiment=False, # Maybe not useful for ranking reports include_relevance=True, # But relevance scoring could be valuable include_authority=True ) else: enriched_results = report_search_results # Use raw
Implementation Roadmap

Phased rollout of architecture improvements. Each Quick Win can be implemented independently.

Phase Quick Win Effort Files Changed Priority
Phase 1 Extract search_service.py 4 hours 3 files Critical
Phase 1 Unify geo_saves schema 2 hours 8 files + migration Critical
Phase 2 Centralize progress tracking 3 hours 5 files + migration High
Phase 2 Extract classification service 3 hours 4 files High
Estimated Total Effort: 12 hours
All four Quick Wins can be completed in 1-2 days. No breaking changes to existing functionality. Each can be tested independently.
Architectural Considerations
No Caching (User Decision)

Search result caching was rejected. SERPs are cheap and client wants live data. Each query executes fresh API call.

Services Stay Separate

Mentions and Reports run independently. But can use data together when for same site (via save_id linking).

Sentiment as Optional Layer

Classification not forced. Mentions always uses it. Reports can optionally enable for specific use cases.

Schema Isolation

All GEO tables moving to geo.* schema. Shared infra (auth, async ops) stays in shared.*.

George
Online
0%

Hi, I'm George.

Ask me about your projects, reports, brand mentions, backlinks, or anything on the platform.