🌍 Public Sentiment Collection Agent

Geographic Sentiment Analysis with Credibility Tracking

Python 3.9+ Gemini 2.0 Flash Tavily API Pandas Matplotlib

🔍 About This Code Showcase

This curated code snippet demonstrates how the Public Sentiment Collection Agent performs geographic sentiment analysis with advanced credibility tracking and source diversity assessment.

Full deployment scripts, API credentials, and proprietary details are omitted for clarity and security. This showcase highlights the core multi-agent orchestration and sentiment analysis algorithms.

📖 Core Algorithm: Geographic Social Listening

The foundation of the Public Sentiment Agent is its ability to collect sentiment data with geographic segmentation and track source diversity for credibility assessment:

🌐 geographic_listening_agent.py
def geographic_listening_agent( issue_keyword: str, locations: list = ["global"], num_sources_per_location: int = 15 ) -> dict: """ Collect sentiment data with geographic segmentation and source diversity tracking. This agent addresses a critical flaw in traditional sentiment analysis: lumping all geographic regions together without considering cultural context. Example: "Public opinion on alcohol consumption" - WITHOUT geographic filtering: Misleading global average - WITH geographic filtering: Reveals cultural nuances (Saudi Arabia vs Germany) Args: issue_keyword: Topic to research locations: List of countries/regions (e.g., ["USA", "Germany", "Saudi Arabia"]) num_sources_per_location: Sources per location Returns: dict: Data organized by location with diversity metrics """ log_agent_title_html("Geographic Social Listening Agent", "🌍") location_data = {} for location in locations: log_tool_call_html("tavily_search_geographic", f"location={location}") # Search with location-specific filtering search_result = tavily_search_geographic( query=issue_keyword, location=location, max_results=num_sources_per_location ) if 'error' not in search_result: results = search_result['results'] # CRITICAL: Analyze source diversity to detect bias diversity = analyze_source_diversity(results) location_data[location] = { 'snippets': [r['content'] for r in results if r.get('content')], 'sources': [{'title': r['title'], 'url': r['url'], 'domain': r['domain']} for r in results], 'diversity': diversity } # Warn if data quality is compromised if diversity['warnings']: for warning in diversity['warnings']: log_warning_html(f"{location}: {warning}") return { 'issue': issue_keyword, 'location_data': location_data, 'collection_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }

🔍 Source Diversity Analysis Engine

The system automatically detects data quality issues by analyzing source diversity and concentration bias:

📊 source_diversity_analyzer.py
def analyze_source_diversity(results: list) -> dict: """ Analyze diversity of information sources and detect bias patterns. This prevents the "echo chamber problem" where all data comes from a single source type (e.g., 90% social media) or domain. Returns: dict: Diversity metrics and quality warnings """ domains = [r.get('domain', 'unknown') for r in results] domain_counts = Counter(domains) # Classify source types using domain patterns source_types = [] for domain in domains: if any(x in domain for x in ['reddit.com', 'twitter.com', 'facebook.com']): source_types.append('social_media') elif any(x in domain for x in ['news', 'times', 'post', 'bbc', 'cnn']): source_types.append('news') elif any(x in domain for x in ['.gov', '.edu']): source_types.append('institutional') else: source_types.append('other') type_counts = Counter(source_types) # Calculate diversity score (0-100, higher = more diverse) unique_domains = len(domain_counts) total_sources = len(domains) diversity_score = min(100, (unique_domains / max(1, total_sources)) * 150) # Detect concentration bias and issue warnings warnings = [] most_common_domain, max_count = domain_counts.most_common(1)[0] if domain_counts else ('', 0) if max_count > total_sources * 0.4: warnings.append( f"⚠️ {(max_count/total_sources)*100:.0f}% of sources from single domain: {most_common_domain}" ) if type_counts.get('social_media', 0) > total_sources * 0.7: warnings.append("⚠️ Over 70% sources are social media (potential echo chamber bias)") if unique_domains < 5: warnings.append(f"⚠️ Only {unique_domains} unique sources (low diversity)") return { 'diversity_score': round(diversity_score, 1), 'unique_domains': unique_domains, 'total_sources': total_sources, 'source_type_distribution': dict(type_counts), 'top_domains': dict(domain_counts.most_common(5)), 'warnings': warnings }

🧠 Comparative Sentiment Analysis Engine

The sentiment analysis engine processes data separately for each location and calculates credibility scores:

💡 comparative_sentiment_agent.py
def comparative_sentiment_agent(listening_data: dict) -> dict: """ Analyze sentiment separately for each location with credibility tracking. Credibility Score = (Source Diversity * 0.6) + (Sample Size * 0.4) This ensures findings are weighted by data quality, not just sentiment counts. """ log_agent_title_html("Comparative Sentiment Analysis Agent", "🧠") issue = listening_data['issue'] location_data = listening_data['location_data'] location_sentiments = {} for location, data in location_data.items(): snippets = data['snippets'] if not snippets: log_warning_html(f"No data for {location}, skipping...") continue # Analyze sentiment with cultural context awareness analyses = analyze_sentiment_with_context(snippets, issue) # Calculate statistics sentiments = [a.get('sentiment', 'neutral') for a in analyses if 'error' not in a] emotions = [a.get('emotion', 'neutral') for a in analyses if 'error' not in a] sentiment_counts = Counter(sentiments) total = len(sentiments) if sentiments else 1 sentiment_dist = { 'positive': (sentiment_counts.get('positive', 0) / total) * 100, 'negative': (sentiment_counts.get('negative', 0) / total) * 100, 'neutral': (sentiment_counts.get('neutral', 0) / total) * 100 } # CRITICAL: Calculate credibility score (0-100) # This weights findings by data quality, not just raw sentiment diversity_score = data['diversity']['diversity_score'] sample_size_score = min(100, (len(snippets) / 20) * 100) credibility_score = (diversity_score * 0.6 + sample_size_score * 0.4) location_sentiments[location] = { 'sentiment_distribution': sentiment_dist, 'emotion_counts': dict(Counter(emotions)), 'sample_size': len(sentiments), 'credibility_score': round(credibility_score, 1), 'diversity_metrics': data['diversity'] } log_tool_result_html( f"{location}: Pos={sentiment_dist['positive']:.0f}% " f"Neg={sentiment_dist['negative']:.0f}% | Credibility: {credibility_score:.0f}/100" ) return { 'issue': issue, 'location_sentiments': location_sentiments }

⚙️ Technical Implementation Notes

Key Algorithms & Innovations

Why This Approach Works

Real-World Example

Topic: "Public opinion on alcohol consumption"