Chroma / comfy_api_nodes /nodes_openai.py
gokaygokay's picture
Upload 1005 files
77f10a3 verified
import io
from inspect import cleandoc
import numpy as np
import torch
from PIL import Image
from comfy.comfy_types.node_typing import IO, ComfyNodeABC, InputTypeDict
from comfy_api_nodes.apis import (
OpenAIImageGenerationRequest,
OpenAIImageEditRequest,
OpenAIImageGenerationResponse,
)
from comfy_api_nodes.apis.client import (
ApiEndpoint,
HttpMethod,
SynchronousOperation,
)
from comfy_api_nodes.apinode_utils import (
downscale_image_tensor,
validate_and_cast_response,
validate_string,
)
class OpenAIDalle2(ComfyNodeABC):
"""
Generates images synchronously via OpenAI's DALL路E 2 endpoint.
"""
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls) -> InputTypeDict:
return {
"required": {
"prompt": (
IO.STRING,
{
"multiline": True,
"default": "",
"tooltip": "Text prompt for DALL路E",
},
),
},
"optional": {
"seed": (
IO.INT,
{
"default": 0,
"min": 0,
"max": 2**31 - 1,
"step": 1,
"display": "number",
"control_after_generate": True,
"tooltip": "not implemented yet in backend",
},
),
"size": (
IO.COMBO,
{
"options": ["256x256", "512x512", "1024x1024"],
"default": "1024x1024",
"tooltip": "Image size",
},
),
"n": (
IO.INT,
{
"default": 1,
"min": 1,
"max": 8,
"step": 1,
"display": "number",
"tooltip": "How many images to generate",
},
),
"image": (
IO.IMAGE,
{
"default": None,
"tooltip": "Optional reference image for image editing.",
},
),
"mask": (
IO.MASK,
{
"default": None,
"tooltip": "Optional mask for inpainting (white areas will be replaced)",
},
),
},
"hidden": {
"auth_token": "AUTH_TOKEN_COMFY_ORG",
"comfy_api_key": "API_KEY_COMFY_ORG",
"unique_id": "UNIQUE_ID",
},
}
RETURN_TYPES = (IO.IMAGE,)
FUNCTION = "api_call"
CATEGORY = "api node/image/OpenAI"
DESCRIPTION = cleandoc(__doc__ or "")
API_NODE = True
def api_call(
self,
prompt,
seed=0,
image=None,
mask=None,
n=1,
size="1024x1024",
unique_id=None,
**kwargs
):
validate_string(prompt, strip_whitespace=False)
model = "dall-e-2"
path = "/proxy/openai/images/generations"
content_type = "application/json"
request_class = OpenAIImageGenerationRequest
img_binary = None
if image is not None and mask is not None:
path = "/proxy/openai/images/edits"
content_type = "multipart/form-data"
request_class = OpenAIImageEditRequest
input_tensor = image.squeeze().cpu()
height, width, channels = input_tensor.shape
rgba_tensor = torch.ones(height, width, 4, device="cpu")
rgba_tensor[:, :, :channels] = input_tensor
if mask.shape[1:] != image.shape[1:-1]:
raise Exception("Mask and Image must be the same size")
rgba_tensor[:, :, 3] = 1 - mask.squeeze().cpu()
rgba_tensor = downscale_image_tensor(rgba_tensor.unsqueeze(0)).squeeze()
image_np = (rgba_tensor.numpy() * 255).astype(np.uint8)
img = Image.fromarray(image_np)
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format="PNG")
img_byte_arr.seek(0)
img_binary = img_byte_arr # .getvalue()
img_binary.name = "image.png"
elif image is not None or mask is not None:
raise Exception("Dall-E 2 image editing requires an image AND a mask")
# Build the operation
operation = SynchronousOperation(
endpoint=ApiEndpoint(
path=path,
method=HttpMethod.POST,
request_model=request_class,
response_model=OpenAIImageGenerationResponse,
),
request=request_class(
model=model,
prompt=prompt,
n=n,
size=size,
seed=seed,
),
files=(
{
"image": img_binary,
}
if img_binary
else None
),
content_type=content_type,
auth_kwargs=kwargs,
)
response = operation.execute()
img_tensor = validate_and_cast_response(response, node_id=unique_id)
return (img_tensor,)
class OpenAIDalle3(ComfyNodeABC):
"""
Generates images synchronously via OpenAI's DALL路E 3 endpoint.
"""
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls) -> InputTypeDict:
return {
"required": {
"prompt": (
IO.STRING,
{
"multiline": True,
"default": "",
"tooltip": "Text prompt for DALL路E",
},
),
},
"optional": {
"seed": (
IO.INT,
{
"default": 0,
"min": 0,
"max": 2**31 - 1,
"step": 1,
"display": "number",
"control_after_generate": True,
"tooltip": "not implemented yet in backend",
},
),
"quality": (
IO.COMBO,
{
"options": ["standard", "hd"],
"default": "standard",
"tooltip": "Image quality",
},
),
"style": (
IO.COMBO,
{
"options": ["natural", "vivid"],
"default": "natural",
"tooltip": "Vivid causes the model to lean towards generating hyper-real and dramatic images. Natural causes the model to produce more natural, less hyper-real looking images.",
},
),
"size": (
IO.COMBO,
{
"options": ["1024x1024", "1024x1792", "1792x1024"],
"default": "1024x1024",
"tooltip": "Image size",
},
),
},
"hidden": {
"auth_token": "AUTH_TOKEN_COMFY_ORG",
"comfy_api_key": "API_KEY_COMFY_ORG",
"unique_id": "UNIQUE_ID",
},
}
RETURN_TYPES = (IO.IMAGE,)
FUNCTION = "api_call"
CATEGORY = "api node/image/OpenAI"
DESCRIPTION = cleandoc(__doc__ or "")
API_NODE = True
def api_call(
self,
prompt,
seed=0,
style="natural",
quality="standard",
size="1024x1024",
unique_id=None,
**kwargs
):
validate_string(prompt, strip_whitespace=False)
model = "dall-e-3"
# build the operation
operation = SynchronousOperation(
endpoint=ApiEndpoint(
path="/proxy/openai/images/generations",
method=HttpMethod.POST,
request_model=OpenAIImageGenerationRequest,
response_model=OpenAIImageGenerationResponse,
),
request=OpenAIImageGenerationRequest(
model=model,
prompt=prompt,
quality=quality,
size=size,
style=style,
seed=seed,
),
auth_kwargs=kwargs,
)
response = operation.execute()
img_tensor = validate_and_cast_response(response, node_id=unique_id)
return (img_tensor,)
class OpenAIGPTImage1(ComfyNodeABC):
"""
Generates images synchronously via OpenAI's GPT Image 1 endpoint.
"""
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls) -> InputTypeDict:
return {
"required": {
"prompt": (
IO.STRING,
{
"multiline": True,
"default": "",
"tooltip": "Text prompt for GPT Image 1",
},
),
},
"optional": {
"seed": (
IO.INT,
{
"default": 0,
"min": 0,
"max": 2**31 - 1,
"step": 1,
"display": "number",
"control_after_generate": True,
"tooltip": "not implemented yet in backend",
},
),
"quality": (
IO.COMBO,
{
"options": ["low", "medium", "high"],
"default": "low",
"tooltip": "Image quality, affects cost and generation time.",
},
),
"background": (
IO.COMBO,
{
"options": ["opaque", "transparent"],
"default": "opaque",
"tooltip": "Return image with or without background",
},
),
"size": (
IO.COMBO,
{
"options": ["auto", "1024x1024", "1024x1536", "1536x1024"],
"default": "auto",
"tooltip": "Image size",
},
),
"n": (
IO.INT,
{
"default": 1,
"min": 1,
"max": 8,
"step": 1,
"display": "number",
"tooltip": "How many images to generate",
},
),
"image": (
IO.IMAGE,
{
"default": None,
"tooltip": "Optional reference image for image editing.",
},
),
"mask": (
IO.MASK,
{
"default": None,
"tooltip": "Optional mask for inpainting (white areas will be replaced)",
},
),
},
"hidden": {
"auth_token": "AUTH_TOKEN_COMFY_ORG",
"comfy_api_key": "API_KEY_COMFY_ORG",
"unique_id": "UNIQUE_ID",
},
}
RETURN_TYPES = (IO.IMAGE,)
FUNCTION = "api_call"
CATEGORY = "api node/image/OpenAI"
DESCRIPTION = cleandoc(__doc__ or "")
API_NODE = True
def api_call(
self,
prompt,
seed=0,
quality="low",
background="opaque",
image=None,
mask=None,
n=1,
size="1024x1024",
unique_id=None,
**kwargs
):
validate_string(prompt, strip_whitespace=False)
model = "gpt-image-1"
path = "/proxy/openai/images/generations"
content_type="application/json"
request_class = OpenAIImageGenerationRequest
img_binaries = []
mask_binary = None
files = []
if image is not None:
path = "/proxy/openai/images/edits"
request_class = OpenAIImageEditRequest
content_type ="multipart/form-data"
batch_size = image.shape[0]
for i in range(batch_size):
single_image = image[i : i + 1]
scaled_image = downscale_image_tensor(single_image).squeeze()
image_np = (scaled_image.numpy() * 255).astype(np.uint8)
img = Image.fromarray(image_np)
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format="PNG")
img_byte_arr.seek(0)
img_binary = img_byte_arr
img_binary.name = f"image_{i}.png"
img_binaries.append(img_binary)
if batch_size == 1:
files.append(("image", img_binary))
else:
files.append(("image[]", img_binary))
if mask is not None:
if image is None:
raise Exception("Cannot use a mask without an input image")
if image.shape[0] != 1:
raise Exception("Cannot use a mask with multiple image")
if mask.shape[1:] != image.shape[1:-1]:
raise Exception("Mask and Image must be the same size")
batch, height, width = mask.shape
rgba_mask = torch.zeros(height, width, 4, device="cpu")
rgba_mask[:, :, 3] = 1 - mask.squeeze().cpu()
scaled_mask = downscale_image_tensor(rgba_mask.unsqueeze(0)).squeeze()
mask_np = (scaled_mask.numpy() * 255).astype(np.uint8)
mask_img = Image.fromarray(mask_np)
mask_img_byte_arr = io.BytesIO()
mask_img.save(mask_img_byte_arr, format="PNG")
mask_img_byte_arr.seek(0)
mask_binary = mask_img_byte_arr
mask_binary.name = "mask.png"
files.append(("mask", mask_binary))
# Build the operation
operation = SynchronousOperation(
endpoint=ApiEndpoint(
path=path,
method=HttpMethod.POST,
request_model=request_class,
response_model=OpenAIImageGenerationResponse,
),
request=request_class(
model=model,
prompt=prompt,
quality=quality,
background=background,
n=n,
seed=seed,
size=size,
),
files=files if files else None,
content_type=content_type,
auth_kwargs=kwargs,
)
response = operation.execute()
img_tensor = validate_and_cast_response(response, node_id=unique_id)
return (img_tensor,)
# A dictionary that contains all nodes you want to export with their names
# NOTE: names should be globally unique
NODE_CLASS_MAPPINGS = {
"OpenAIDalle2": OpenAIDalle2,
"OpenAIDalle3": OpenAIDalle3,
"OpenAIGPTImage1": OpenAIGPTImage1,
}
# A dictionary that contains the friendly/humanly readable titles for the nodes
NODE_DISPLAY_NAME_MAPPINGS = {
"OpenAIDalle2": "OpenAI DALL路E 2",
"OpenAIDalle3": "OpenAI DALL路E 3",
"OpenAIGPTImage1": "OpenAI GPT Image 1",
}