File size: 4,894 Bytes
09565b8
 
 
 
4e95c16
09565b8
 
 
4e95c16
09565b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8ce80d5
09565b8
 
6ec55cf
 
 
 
 
09565b8
 
6ec55cf
 
09565b8
6ec55cf
 
 
 
 
8ce80d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63f09ca
8ce80d5
 
 
 
 
 
 
 
 
 
 
09565b8
 
 
6ec55cf
 
8ce80d5
6ec55cf
 
 
 
 
 
09565b8
 
6ec55cf
 
 
 
 
 
8ce80d5
6ec55cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
09565b8
 
e2eb790
09565b8
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# import required modules
import os
import time
import base64
import asyncio
import gradio as gr
from PIL import Image
from google import genai
from threading import Lock
from google.genai import types

# define which Gemini model version is going to be used
model_id = "gemini-2.0-flash-exp"

# Gemini model system instruction settings
system_instruction="""
    "You always reply in the same language the user sent the question. It is mandatory.",
    "You only change the response language if explicitly asked - otherwise, answer in the original language."
    "You are an assistant who helps people with their questions.",
    "You only provide answers in one paragraph or less.",
    "Your answers are long enough to not miss any information.",
    "You are always kind and use simple, pleasant language.",
"""


## helper functions
# convert image files to base64 data
def image_to_base64(image_path):
  with open(image_path, 'rb') as img:
    encoded_string = base64.b64encode(img.read())
  return encoded_string.decode('utf-8')


# show user message at the chatbot history
def query_message(history,txt,img):
 if not img:
  history += [(txt,None)]
  return history
 if img:
  base64 = image_to_base64(img)
  data_url = f"data:image/jpeg;base64,{base64}"
  history += [(f"{txt} ![]({data_url})", None)]
  return history

## gradio interface
# gradio page variables
TITLE = """<h1 align="center">Gemini 2.0 Chatbot 🤖</h1>"""
SUBTITLE = """<h2 align="center">A multimodal chatbot powered by Gradio and Gemini API</h2>"""

# gradio styles in css
css = """
.container {
    max-width: 100%;
    padding: 0 1rem;
}

.chatbot {
    height: calc(100vh - 250px) !important;
    overflow-y: auto;
}

.textbox {
    margin-top: 0.5rem;
}
"""

# gradio chatbot main function
def registry(name, token, examples=None, **kwargs):
  client = genai.Client(api_key=token)
  chat_locks = {}  # Dictionary to hold locks for each user's chat
  chat_sessions = {} # Dictionary to hold each user chat

  def create_chat():
    return client.chats.create(
      model=name,
      config=types.GenerateContentConfig(
        system_instruction=system_instruction,
        temperature=0.5,
      ),
    )

  # send a user message to Gemini, streams the response back to the chatbot
  # and updates the history
  def stream_response(history, text, img, request: gr.Request):
    user_id = request.client.host
    if user_id not in chat_locks:
      chat_locks[user_id] = Lock()
      chat_sessions[user_id] = create_chat()

    lock = chat_locks[user_id]
    chat = chat_sessions[user_id]

    try:
      with lock:
        if not img:
          response_stream = chat.send_message_stream(
            text
          )
        else:
          try:
            img = Image.open(img)
            response_stream = chat.send_message_stream(
              [text, img]
            )
          except Exception as e:
            print(f"Error processing image: {str(e)}")
            return

        # Initialize response text
        response_text = ""

        # Stream the response
        for chunk in response_stream:
          if chunk.text:
            response_text += chunk.text
            # Update the last message in history with the new content
            history[-1] = (history[-1][0], response_text)
            yield history

    except Exception as e:
      print(f"Error in stream_response: {str(e)}")
      return

  print("Building the gradio app...")
  with gr.Blocks(css=css) as app:
    gr.HTML(TITLE)
    gr.HTML(SUBTITLE)

    with gr.Row():
      image_box = gr.Image(type="filepath")
      chatbot = gr.Chatbot(
        scale=2,
        height=500,
        container=True
      )

    text_box = gr.Textbox(
      placeholder="Type your message and press enter and optionally upload an image",
      container=False,
    )

    btn = gr.Button("Send")

    # Update the event handlers to use streaming
    btn.click(
      fn=query_message,
      inputs=[chatbot, text_box, image_box],
      outputs=[chatbot],
    ).then(
      fn=stream_response,
      inputs=[chatbot, text_box, image_box],
      outputs=[chatbot],
      api_name="stream_response"
    ).then(
      fn=lambda: (None, ""),  # Clear the image and text inputs after sending
      inputs=None,
      outputs=[image_box, text_box],
    )

    # Add enter key handler
    text_box.submit(
      fn=query_message,
      inputs=[chatbot, text_box, image_box],
      outputs=[chatbot],
    ).then(
      fn=stream_response,
      inputs=[chatbot, text_box, image_box],
      outputs=[chatbot],
      api_name="stream_response"
    ).then(
      fn=lambda: (None, ""),  # Clear the image and text inputs after sending
      inputs=None,
      outputs=[image_box, text_box],
    )

  return app

if __name__ == "__main__":
  # launch the gradio chatbot
  gr.load(
  name=model_id,
  src=registry,
  accept_token=True
).launch()