Documentation AIME API Worker Interface
- class aime_api_worker_interface.APIWorkerInterface(api_server, job_type, auth_key, gpu_id=0, world_size=1, rank=0, gpu_name=None, image_metadata_params=['prompt', 'negative_prompt', 'seed', 'base_steps', 'refine_steps', 'scale', 'aesthetic_score', 'negative_aesthetic_score', 'img2img_strength', 'base_sampler', 'refine_sampler', 'base_discretization', 'refine_discretization'], print_server_status=True, request_timeout=60, worker_version=0, exit_callback=None)
Interface for deep learning models to communicate with AIME API Server.
- Parameters:
api_server (str) – Address of API Server. Example: ‘http://api.aime.team’.
job_type (str) – Type of job . Example: “stable_diffusion_xl_txt2img”.
auth_key (str) – key to authorize worker to connect with API Server.
gpu_id (int, optional) – ID of GPU the worker runs on. Defaults to 0.
world_size (int, optional) – Number of used GPUs the worker runs on. Defaults to 1.
rank (int, optional) – ID of current GPU if world_size > 1. Defaults to 0.
gpu_name (str, optional) – Name of GPU the worker runs on. Defaults to None.
progress_received_callback (callable, optional) – Callback function with http response as argument, called when API server sent response to send_progress(..). Defaults to None.
progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError as argument, called when API server didn’t send response from send_progress(..). Defaults to None.
image_metadata_params (list, optional) – Parameters specific for the used image generator model to add as metadata to the generated image. Fixed parameters are Artist, ProcessingSoftware, Software, ImageEditingSoftware = AIME API <endpoint_name>. Defaults to aime_api_worker_interface.DEFAULT_IMAGE_METADATA.
- progress_data_received
True, if API server sent response to send_progress(), False while progress data is being transmitted or if an error occured.
- Type:
bool
Examples
Minimal example, instantiate the api_worker with URL to the API server, job type and auth_key. Waiting for and receiving job data and sending job result:
from aime_api_worker_interface import APIWorkerInterface api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>) while True: job_data = api_worker.job_request() output = do_deep_learning_worker_calculations(job_data, ...) api_worker.send_job_results(output)
Example usage with progress:
from aime_api_worker_interface import APIWorkerInterface api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>) while True: job_data = api_worker.job_request() for step in deep_learning_worker_calculation: progress_in_percent = round(step*100/len(deep_learning_worker_calculation)) progress_data = do_deep_learning_worker_calculation_step(job_data, ...) if api_worker.progress_data_received: api_worker.send_progress(progress_in_percent, progress_data) output = get_result() api_worker.send_job_results(output)
Example usage with callback:
from aime_api_worker_interface import APIWorkerInterface def progress_callback(api_worker, progress, progress_data): if api_worker.progress_data_received: api_worker.send_progress(progress, progress_data) api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>) callback = Callback(api_worker) while True: job_data = api_worker.job_request() output = do_deep_learning_worker_calculation(job_data, progress_callback, api_worker, ...) api_worker.send_progress(progress, progress_data)
Example usage with callback class:
from aime_api_worker_interface import APIWorkerInterface class Callback(): def __init__(self, api_worker): self.api_worker = api_worker def progress_callback(self, progress, progress_data): if self.api_worker.progress_data_received: self.api_worker.send_progress(progress, progress_data) def result_callback(self, result): self.api_worker.send_job_results(result) api_worker = APIWorkerInterface('http://api.aime.team', 'llama2_chat', <auth_key>) callback = Callback(api_worker) while True: job_data = api_worker.job_request() do_deep_learning_worker_calculation(job_data, callback.result_callback, callback.progress_callback, ...)
- job_request()
Worker requests a single job from the API Server on endpoint route /worker_job_request.
Does call job_batch_request() with max_job_batch size 1 and returns the first job.
See job_batch_request() for more information.
- Returns:
job data with worker [INPUT] parameters received from API server.
- Return type:
dict
- job_batch_request(max_job_batch, wait_for_response=True, callback=None, error_callback=None)
Worker requests a job batch from API Server on endpoint route /worker_job_request.
If there is no client job offer within the job_timeout = request_timeout * 0.9 the API server responds with ‘cmd’:’no_job’ and the worker requests a job again on endpoint route/worker_job_request.
In MultGPU-Mode (world_size > 1) only rank 0 will get the job_data.
- Parameters:
max_job_batch (int) – Maximum job batch size to request.
wait_for_response (bool, optional) – Whether the method blocks until the API Server response is received. If set to False, callback and error_callback are utilized to get the response. Default to True.
callback (callable, optional) – Callback function with API server response as argument. Called, when job request is received by the API Server, if wait_for_response is set to True. Defaults to None.
error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError or http response with :status_code == 503: as argument. Called when API server replied with error, if wait_for_response is set to True. Defaults to None.
- Returns:
List of job data with worker [INPUT] parameters received from API server.
- Return type:
list
Examples
Example job data:
response_data = { 'wait_for_result': False, 'endpoint_name': 'stable_diffusion_xl_txt2img', 'start_time': 1700430052.505548, 'start_time_compute': 1700430052.5124364}, 'cmd': 'job', 'job_data': [{ 'job_id': 'JID1', 'prompt': 'prompt', ... }], 'progress_descriptions': { 'progress_images': { 'type': 'image_list', 'image_format': 'JPEG', 'color_space': 'RGB' } }, 'output_descriptions': { 'images': {'type': 'image_list', 'image_format': 'JPEG', 'color_space': 'RGB'}, 'seed': {'type': 'integer'}, 'prompt': {'type': 'string'}, 'error': {'type': 'string'} } }
- job_request_generator(max_job_batch)
Generator yielding the related job_batch_data whenever there are new job_requests or an empty list if there are running jobs but no new job requests. Blocking, if there are no running jobs and no new job requests to spare hardware resources.
- Parameters:
max_job_batch (int) – Maximum of parallel running jobs
- Yields:
list – job_batch_data of new job requests or empty list
- send_job_results(results, job_data={}, job_id=None, wait_for_response=True, callback=None, error_callback=None)
Process/convert job results and send it to API Server on route /worker_job_result.
- Parameters:
results (dict) – worker [OUTPUT] result parameters (f.i. ‘image’, ‘images’ or ‘text’). Example results:
{'images': [<PIL.Image.Image>, <PIL.Image.Image>, ...]}
job_data (dict, optional) – To use different job_data than the received one or to identify the related job if no job_id is given. Defaults to {}.
job_id (str, optional) – To identify the related job. Defaults to None.
wait_for_response (bool, optional) – Whether the methods blocks until the API Server response is received. Default to True.
callback (callable, optional) – Callback function with API server response as argument. Called, when job result is received by the API Server, if wait_for_response is set to True. Defaults to None.
error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError or http response with :status_code == 503: as argument. Called when API server replied with error, if wait_for_response is set to True. Defaults to None.
- Returns:
Http response from API server to the worker.
- Return type:
requests.models.Response
Examples
Example response.json():
API Server received data without problems: {'cmd': 'ok'} An error occured in API server: {'cmd': 'error', 'msg': <error message>} API Server received data received with a warning: {'cmd': 'warning', 'msg': <warning message>}
- send_progress(progress, progress_data=None, progress_received_callback=None, progress_error_callback=None, job_data={}, job_id=None)
Processes/converts job progress information and data and sends it to API Server on route /worker_job_progress asynchronously to main thread using Pool().apply_async() from multiprocessing.dummy. When Api server received progress data, self.progress_data_received is set to True. Use progress_received_callback and progress_error_callback for response.
- Parameters:
progress (int) – current progress (f.i. percent or number of generated tokens)
progress_data (dict, optional) – dictionary with progress_images or text while worker is computing. Example progress data: :{‘progress_images’: [<PIL.Image.Image>, <PIL.Image.Image>, …]}:. Defaults to None.
progress_received_callback (callable, optional) – Callback function with API server response as argument. Called when progress_data is received. Defaults to None.
progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError or http response with :status_code == 503: as argument. Called when API server replied with error. Defaults to None.
job_data (dict, optional) – To use different job_data than the received one or to identify the related job if no job_id is given. Defaults to {}.
job_id (str, optional) – To identify the related job. Defaults to None.
- send_batch_progress(batch_progress, progress_batch_data, progress_received_callback=None, progress_error_callback=None, job_batch_data=[], job_batch_ids=None)
Processes/converts job progress information and data and sends it to API Server on route /worker_job_progress asynchronously to main thread using Pool().apply_async() from multiprocessing.dummy. When Api server received progress data, self.progress_data_received is set to True. Use progress_received_callback and progress_error_callback for response.
- Parameters:
batch_progress (list(int, int, ...)) – current progress (f.i. percent or number of generated tokens)
progress_batch_data (list(dict, dict, ...), optional) – dictionary with progress_images or text while worker is computing. Example progress data: :{‘progress_images’: [<PIL.Image.Image>, <PIL.Image.Image>, …]}:. Defaults to None.
progress_received_callback (callable, optional) – Callback function with API server response as argument. Called when progress_data is received. Defaults to None.
progress_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError or http response with :status_code == 503: as argument. Called when API server replied with error. Defaults to None.
job_batch_data (list(dict, dict, ...), optional) – List of job datas to use different job_datas than the received one or to identify the related jobs, if no job_batch_ids are given. Default to [].
job_batch_ids (list(int, int, ...), optional) – List of job ids to identify the related jobs. Defaults to None.
- get_current_job_data(job_id=None)
Get the job_data of current job to be processed
- Parameters:
job_id (string, optional) – For single job processing (max_job_batch=1) the job_id is not required. For batch job processing (max_job_batch>1) the job_id is required to specify the job and the job_data is going to be returned. Defaults to None.
- Returns:
the job_data of the current only job or of the job with the given job_id
- Return type:
dict
- get_current_job_batch_data()
get the job_datas of the current batch as list
- Returns:
the list of job_datas of the current jobs to be processed
- Return type:
list
- get_job_batch_parameter(param_name)
get the job_data parameter values for a specific parameter as value array
- Returns:
The value list of the parameter across the batch
- Return type:
list
- has_job_finished(job_data={}, job_id=None)
Check if specific job has been addressed with a send_job_results and is thereby finished
- Parameters:
job_data – job_data of the job to check
- Returns:
True if job has send job_results, False otherwise
- Return type:
bool
- have_all_jobs_finished()
check if all jobs have been addressed with a send_job_results and are therefore finished
- Returns:
True if all jobs have send job_results, False otherwise
- Return type:
bool
- wait_for_job()
Wait until self.new_job_event.set() is called while periodically printing idle string in a non-blocking background thread.
- gracefully_exit()
Gracefully exit the application while cleaning resources and threads with the possibility to call the custom callback exit_callback given in init of APIWorkerInterface().
- async_check_server_connection(check_server_callback=None, check_server_error_callback=None, terminal_output=True)
Non blocking check of Api server status on route /worker_check_server_status using Pool().apply_async() from multiprocessing.dummy
- Parameters:
check_server_callback (callable, optional) – Callback function with with API server response as argument. Called after a successful server check. Defaults to None.
check_server_error_callback (callable, optional) – Callback function with requests.exceptions.ConnectionError as argument Called when server replied with error. Defaults to None.
terminal_output (bool, optional) – Prints server status to terminal if True. Defaults to True.
- check_periodically_if_server_online(interval_seconds=1)
Checking periodically every interval_seconds=1 if server is online by post requests on route /worker_check_server_status.
- Parameters:
interval_seconds (int) – Interval in seconds to update check if server is online
- Returns:
True if server is available again
- Return type:
bool
- get_pnginfo_metadata(job_data)
Parses and returns image metadata from job_data.
- Returns:
PngInfo Object with metadata for PNG images
- Return type:
PIL.PngImagePlugin.PngInfo
- dot_string_generator()
Generator of string with moving dot for server status print output
- Yields:
str – ‘. ‘ with dot moving each call
- static get_version()
Parses name and version of AIME API Worker Interface with pkg_resources
- Returns:
Name and version of AIME API Worker Interface
- Return type:
str