""" Streaming Integration Tests These tests validate streaming behavior by connecting to a running service. They focus on real-time response patterns and streaming event handling. """ import pytest import asyncio import httpx import time import os # Configuration for remote service connection DEFAULT_SERVICE_URL = "http://127.0.0.1:8000" SERVICE_URL = os.getenv("AGENTIC_RAG_SERVICE_URL", DEFAULT_SERVICE_URL) @pytest.fixture(scope="session") def service_url() -> str: """Get the service URL for testing""" return SERVICE_URL class TestStreamingBehavior: """Test streaming response behavior""" @pytest.mark.asyncio async def test_basic_streaming_response(self, service_url: str): """Test that responses are properly streamed""" session_id = f"streaming_test_{int(time.time())}" request_data = { "session_id": session_id, "messages": [{"role": "user", "content": "What is ISO 26262?"}] } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{service_url}/api/chat", json=request_data, headers={"Content-Type": "application/json"} ) assert response.status_code == 200 # Collect streaming chunks chunks = [] async for chunk in response.aiter_text(): chunks.append(chunk) if len(chunks) > 10: # Get enough chunks to verify streaming break # Should receive multiple chunks (indicating streaming) assert len(chunks) > 1 # Chunks should have content total_content = "".join(chunks) assert len(total_content) > 0 @pytest.mark.asyncio async def test_ai_sdk_streaming_format(self, service_url: str): """Test AI SDK compatible streaming format""" session_id = f"ai_sdk_streaming_{int(time.time())}" request_data = { "session_id": session_id, "messages": [{"role": "user", "content": "Explain vehicle safety testing"}] } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{service_url}/api/ai-sdk/chat", json=request_data, headers={"Content-Type": "application/json"} ) assert response.status_code == 200 assert "text/plain" in response.headers.get("content-type", "") # Test streaming behavior chunk_count = 0 total_length = 0 async for chunk in response.aiter_text(): chunk_count += 1 total_length += len(chunk) if chunk_count > 15: # Collect enough chunks break # Verify streaming characteristics assert chunk_count > 1 # Multiple chunks assert total_length > 50 # Meaningful content @pytest.mark.asyncio async def test_streaming_performance(self, service_url: str): """Test streaming response timing and performance""" session_id = f"streaming_perf_{int(time.time())}" request_data = { "session_id": session_id, "messages": [{"role": "user", "content": "What are automotive safety standards?"}] } async with httpx.AsyncClient(timeout=60.0) as client: start_time = time.time() response = await client.post( f"{service_url}/api/chat", json=request_data, headers={"Content-Type": "application/json"} ) assert response.status_code == 200 first_chunk_time = None chunk_count = 0 async for chunk in response.aiter_text(): if first_chunk_time is None: first_chunk_time = time.time() chunk_count += 1 if chunk_count > 5: # Get a few chunks for timing break # Time to first chunk should be reasonable (< 10 seconds) if first_chunk_time: time_to_first_chunk = first_chunk_time - start_time assert time_to_first_chunk < 10.0 @pytest.mark.asyncio async def test_streaming_interruption_handling(self, service_url: str): """Test behavior when streaming is interrupted""" session_id = f"streaming_interrupt_{int(time.time())}" request_data = { "session_id": session_id, "messages": [{"role": "user", "content": "Tell me about ISO standards"}] } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{service_url}/api/chat", json=request_data, headers={"Content-Type": "application/json"} ) assert response.status_code == 200 # Read only a few chunks then stop chunk_count = 0 async for chunk in response.aiter_text(): chunk_count += 1 if chunk_count >= 3: break # Interrupt streaming # Should have received some chunks assert chunk_count > 0 class TestConcurrentStreaming: """Test concurrent streaming scenarios""" @pytest.mark.asyncio async def test_multiple_concurrent_streams(self, service_url: str): """Test multiple concurrent streaming requests""" base_time = int(time.time()) async def stream_request(session_suffix: str, question: str): """Make a single streaming request""" session_id = f"concurrent_stream_{base_time}_{session_suffix}" async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{service_url}/api/chat", json={ "session_id": session_id, "messages": [{"role": "user", "content": question}] }, headers={"Content-Type": "application/json"} ) assert response.status_code == 200 # Read some chunks chunks = 0 async for chunk in response.aiter_text(): chunks += 1 if chunks > 5: break return chunks # Run multiple concurrent streams questions = [ "What is ISO 26262?", "Explain NIST framework", "What is GDPR?" ] tasks = [ stream_request(f"session_{i}", question) for i, question in enumerate(questions) ] results = await asyncio.gather(*tasks, return_exceptions=True) # All streams should complete successfully assert len(results) == 3 for result in results: assert not isinstance(result, Exception) assert result > 0 # Each stream should receive chunks @pytest.mark.asyncio async def test_same_session_rapid_requests(self, service_url: str): """Test rapid requests in the same session""" session_id = f"rapid_session_{int(time.time())}" questions = [ "Hello", "What is ISO 9001?", "Thank you" ] async with httpx.AsyncClient(timeout=60.0) as client: for i, question in enumerate(questions): request_data = { "session_id": session_id, "messages": [{"role": "user", "content": question}] } response = await client.post( f"{service_url}/api/chat", json=request_data, headers={"Content-Type": "application/json"} ) assert response.status_code == 200 # Read some response chunk_count = 0 async for chunk in response.aiter_text(): chunk_count += 1 if chunk_count > 3: break print(f"Request {i+1} completed with {chunk_count} chunks") # Very short delay await asyncio.sleep(0.2) class TestStreamingErrorHandling: """Test error handling during streaming""" @pytest.mark.asyncio async def test_streaming_with_invalid_session(self, service_url: str): """Test streaming behavior with edge case session IDs""" test_cases = [ "", # Empty session ID "a" * 1000, # Very long session ID "session with spaces", # Session ID with spaces "session/with/slashes" # Session ID with special chars ] async with httpx.AsyncClient(timeout=60.0) as client: for session_id in test_cases: request_data = { "session_id": session_id, "messages": [{"role": "user", "content": "Hello"}] } try: response = await client.post( f"{service_url}/api/chat", json=request_data, headers={"Content-Type": "application/json"} ) # Should either work or return validation error assert response.status_code in [200, 422] except Exception as e: # Some edge cases might cause exceptions, which is acceptable print(f"Session ID '{session_id}' caused exception: {e}") @pytest.mark.asyncio async def test_streaming_with_large_messages(self, service_url: str): """Test streaming with large message content""" session_id = f"large_msg_stream_{int(time.time())}" # Create a large message large_content = "Please explain safety standards. " * 100 # ~3KB message request_data = { "session_id": session_id, "messages": [{"role": "user", "content": large_content}] } async with httpx.AsyncClient(timeout=90.0) as client: response = await client.post( f"{service_url}/api/chat", json=request_data, headers={"Content-Type": "application/json"} ) # Should handle large messages appropriately assert response.status_code in [200, 413, 422] if response.status_code == 200: # If accepted, should stream properly chunk_count = 0 async for chunk in response.aiter_text(): chunk_count += 1 if chunk_count > 5: break assert chunk_count > 0 class TestStreamingContentValidation: """Test streaming content quality and format""" @pytest.mark.asyncio async def test_streaming_content_encoding(self, service_url: str): """Test that streaming content is properly encoded""" session_id = f"encoding_test_{int(time.time())}" # Test with special characters and unicode test_message = "What is ISO 26262? Please explain with émphasis on safety ñorms." request_data = { "session_id": session_id, "messages": [{"role": "user", "content": test_message}] } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{service_url}/api/chat", json=request_data, headers={"Content-Type": "application/json"} ) assert response.status_code == 200 # Collect content and verify encoding content = "" async for chunk in response.aiter_text(): content += chunk if len(content) > 100: break # Content should be valid UTF-8 assert isinstance(content, str) assert len(content) > 0 # Should be able to encode/decode encoded = content.encode('utf-8') decoded = encoded.decode('utf-8') assert decoded == content @pytest.mark.asyncio async def test_streaming_response_consistency(self, service_url: str): """Test that streaming responses are consistent for similar queries""" base_session = f"consistency_test_{int(time.time())}" # Ask the same question multiple times test_question = "What is ISO 26262?" responses = [] async with httpx.AsyncClient(timeout=60.0) as client: for i in range(3): session_id = f"{base_session}_{i}" request_data = { "session_id": session_id, "messages": [{"role": "user", "content": test_question}] } response = await client.post( f"{service_url}/api/chat", json=request_data, headers={"Content-Type": "application/json"} ) assert response.status_code == 200 # Collect response content = "" async for chunk in response.aiter_text(): content += chunk if len(content) > 200: break responses.append(content) await asyncio.sleep(0.5) # All responses should have content for response_content in responses: assert len(response_content) > 50 # Responses should have some consistency (all non-empty) assert len([r for r in responses if r.strip()]) == len(responses)