SubTask_solve_run_parallel

Sub Task running in parallel in FastAPI

Given the constraints that GlobalmixRequest and GlobalmixResponse are complex JSON objects and you cannot modify their structure, we can still handle concurrent execution. The key is to handle each sub-task within the large JSON object concurrently.

We'll assume that the GlobalmixRequest contains identifiable sub-tasks that can be run independently. Here's how you can modify the solution to handle this scenario: Extract sub-tasks from the request object: Identify the sub-tasks within the GlobalmixRequest. Run these sub-tasks concurrently: Use ThreadPoolExecutor to run these sub-tasks. Aggregate the results into GlobalmixResponse: Combine the results into the required response format.

Step-by-Step Implementation:

  • Define the Pydantic Models: Assuming GlobalmixRequest and GlobalmixResponse are already defined in schemas.py.

  • Extract Sub-tasks: Implement a function to extract sub-tasks from the GlobalmixRequest.

  • Concurrent Execution: Use ThreadPoolExecutor to run these sub-tasks concurrently.

  • Combine Results: Aggregate the results into the GlobalmixResponse.

  • Example Implementation:

from fastapi import FastAPI, HTTPException  
from concurrent.futures import ThreadPoolExecutor, as_completed  
from schemas import GlobalmixRequest, GlobalmixResponse  
import traceback  
import logging  
  
app = FastAPI()  
logger = logging.getLogger(__name__)  
  
# Dummy solver function  
def solve_task(sub_task):  
    # Simulate a time-consuming operation  
    # Replace this with your actual solve logic  
    return f"Result of solving with sub-task {sub_task}"  
  
# Function to run multiple solves concurrently  
def run_parallel_solves(data):  
    results = []  
    with ThreadPoolExecutor(max_workers=30) as executor:  
        # Assuming data['tasks'] contains the sub-tasks to be processed concurrently  
        futures = {executor.submit(solve_task, sub_task): sub_task for sub_task in data['tasks']}  
        for future in as_completed(futures):  
            sub_task = futures[future]  
            try:  
                result = future.result()  
                results.append(result)  
            except Exception as exc:  
                results.append(f"Sub-task {sub_task} generated an exception: {exc}")  
    return results  
  
@app.post("/parametrics/v1", response_model=GlobalmixResponse, response_model_exclude_none=True)  
async def process_data(request: GlobalmixRequest):  
    try:  
        # Extract parameters from the request model  
        data = request.model_dump(mode='json')  
          
        # Run the solve function in parallel  
        results = run_parallel_solves(data)  
          
        # Construct the response  
        # Assuming GlobalmixResponse can be constructed with a dictionary containing a 'results' key  
        response_data = {'results': results}  
        response = GlobalmixResponse(**response_data)  
          
        return response  
    except Exception as e:  
        msg = f"Internal Server Error \n{traceback.format_exc()}"  
        logger.error(msg)  
        raise HTTPException(status_code=500, detail=msg)  
# To run the FastAPI app, use the following command in the terminal:  
# uvicorn script_name:app --host 0.0.0.0 --port 8000 --reload  

Explanation:

Extract Sub-tasks data: Modify the run_parallel_solves function to extract the sub-tasks from the data. Here, it's assumed that data['tasks'] contains the sub-tasks. Adjust this according to your actual data structure. Concurrent Execution of sub tasks, from data['tasks']

Last updated