at_eof bug fix
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -1215,33 +1215,39 @@ class MetadataExtractor:
|
|||||||
try:
|
try:
|
||||||
if self.connection_manager:
|
if self.connection_manager:
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
# Try to run the async query
|
|
||||||
try:
|
|
||||||
# Check if there's a running event loop
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
# If we're in an async context, we need to run in a separate thread
|
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
|
import threading
|
||||||
|
|
||||||
|
# Always run in a separate thread with new event loop to avoid conflicts
|
||||||
def run_in_new_loop():
|
def run_in_new_loop():
|
||||||
|
# Create new event loop for this thread
|
||||||
new_loop = asyncio.new_event_loop()
|
new_loop = asyncio.new_event_loop()
|
||||||
asyncio.set_event_loop(new_loop)
|
asyncio.set_event_loop(new_loop)
|
||||||
try:
|
try:
|
||||||
return new_loop.run_until_complete(
|
return new_loop.run_until_complete(
|
||||||
self._execute_query_async(query, db_name, return_dataframe)
|
self._execute_query_async(query, db_name, return_dataframe)
|
||||||
)
|
)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
# Properly close the loop
|
||||||
|
pending = asyncio.all_tasks(new_loop)
|
||||||
|
if pending:
|
||||||
|
new_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||||
finally:
|
finally:
|
||||||
new_loop.close()
|
new_loop.close()
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
# Use ThreadPoolExecutor to run in separate thread
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||||
future = executor.submit(run_in_new_loop)
|
future = executor.submit(run_in_new_loop)
|
||||||
|
try:
|
||||||
return future.result(timeout=30)
|
return future.result(timeout=30)
|
||||||
|
except concurrent.futures.TimeoutError:
|
||||||
except RuntimeError:
|
logger.error("Query execution timed out after 30 seconds")
|
||||||
# No running loop, we can safely create one
|
if return_dataframe:
|
||||||
return asyncio.run(
|
import pandas as pd
|
||||||
self._execute_query_async(query, db_name, return_dataframe)
|
return pd.DataFrame()
|
||||||
)
|
else:
|
||||||
|
return []
|
||||||
else:
|
else:
|
||||||
# Fallback: Return empty result
|
# Fallback: Return empty result
|
||||||
logger.warning("No connection manager provided, returning empty result")
|
logger.warning("No connection manager provided, returning empty result")
|
||||||
|
|||||||
Reference in New Issue
Block a user