import datetime import inspect import mimetypes import sys from os import getcwd, unlink from platform import system from tempfile import NamedTemporaryFile from typing import Any, Callable, Dict, List, Type from PIL import Image import pandas as pd import streamlit as st from fastapi.encoders import jsonable_encoder from loguru import logger from pydantic import BaseModel, ValidationError, parse_obj_as from mkgui.base import Opyrator from mkgui.base.core import name_to_title from mkgui.base.ui import schema_utils from mkgui.base.ui.streamlit_utils import CUSTOM_STREAMLIT_CSS STREAMLIT_RUNNER_SNIPPET = """ from mkgui.base.ui import render_streamlit_ui from mkgui.base import Opyrator import streamlit as st # TODO: Make it configurable # Page config can only be setup once st.set_page_config( page_title="MockingBird", page_icon="🧊", layout="wide") render_streamlit_ui() """ # with st.spinner("Loading MockingBird GUI. Please wait..."): # opyrator = Opyrator("{opyrator_path}") def launch_ui(port: int = 8501) -> None: with NamedTemporaryFile( suffix=".py", mode="w", encoding="utf-8", delete=False ) as f: f.write(STREAMLIT_RUNNER_SNIPPET) f.seek(0) import subprocess python_path = f'PYTHONPATH="$PYTHONPATH:{getcwd()}"' if system() == "Windows": python_path = f"set PYTHONPATH=%PYTHONPATH%;{getcwd()} &&" subprocess.run( f"""set STREAMLIT_GLOBAL_SHOW_WARNING_ON_DIRECT_EXECUTION=false""", shell=True, ) subprocess.run( f"""{python_path} "{sys.executable}" -m streamlit run --server.port={port} --server.headless=True --runner.magicEnabled=False --server.maxUploadSize=50 --browser.gatherUsageStats=False {f.name}""", shell=True, ) f.close() unlink(f.name) def function_has_named_arg(func: Callable, parameter: str) -> bool: try: sig = inspect.signature(func) for param in sig.parameters.values(): if param.name == "input": return True except Exception: return False return False def has_output_ui_renderer(data_item: BaseModel) -> bool: return hasattr(data_item, "render_output_ui") def has_input_ui_renderer(input_class: Type[BaseModel]) -> bool: return hasattr(input_class, "render_input_ui") def is_compatible_audio(mime_type: str) -> bool: return mime_type in ["audio/mpeg", "audio/ogg", "audio/wav"] def is_compatible_image(mime_type: str) -> bool: return mime_type in ["image/png", "image/jpeg"] def is_compatible_video(mime_type: str) -> bool: return mime_type in ["video/mp4"] class InputUI: def __init__(self, session_state, input_class: Type[BaseModel]): self._session_state = session_state self._input_class = input_class self._schema_properties = input_class.schema(by_alias=True).get( "properties", {} ) self._schema_references = input_class.schema(by_alias=True).get( "definitions", {} ) def render_ui(self, streamlit_app_root) -> None: if has_input_ui_renderer(self._input_class): # The input model has a rendering function # The rendering also returns the current state of input data self._session_state.input_data = self._input_class.render_input_ui( # type: ignore st, self._session_state.input_data ) return # print(self._schema_properties) for property_key in self._schema_properties.keys(): property = self._schema_properties[property_key] if not property.get("title"): # Set property key as fallback title property["title"] = name_to_title(property_key) try: if "input_data" in self._session_state: self._store_value( property_key, self._render_property(streamlit_app_root, property_key, property), ) except Exception as e: print("Exception!", e) pass def _get_default_streamlit_input_kwargs(self, key: str, property: Dict) -> Dict: streamlit_kwargs = { "label": property.get("title"), "key": key, } if property.get("description"): streamlit_kwargs["help"] = property.get("description") return streamlit_kwargs def _store_value(self, key: str, value: Any) -> None: data_element = self._session_state.input_data key_elements = key.split(".") for i, key_element in enumerate(key_elements): if i == len(key_elements) - 1: # add value to this element data_element[key_element] = value return if key_element not in data_element: data_element[key_element] = {} data_element = data_element[key_element] def _get_value(self, key: str) -> Any: data_element = self._session_state.input_data key_elements = key.split(".") for i, key_element in enumerate(key_elements): if i == len(key_elements) - 1: # add value to this element if key_element not in data_element: return None return data_element[key_element] if key_element not in data_element: data_element[key_element] = {} data_element = data_element[key_element] return None def _render_single_datetime_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: streamlit_kwargs = self._get_default_streamlit_input_kwargs(key, property) if property.get("format") == "time": if property.get("default"): try: streamlit_kwargs["value"] = datetime.time.fromisoformat( # type: ignore property.get("default") ) except Exception: pass return streamlit_app.time_input(**streamlit_kwargs) elif property.get("format") == "date": if property.get("default"): try: streamlit_kwargs["value"] = datetime.date.fromisoformat( # type: ignore property.get("default") ) except Exception: pass return streamlit_app.date_input(**streamlit_kwargs) elif property.get("format") == "date-time": if property.get("default"): try: streamlit_kwargs["value"] = datetime.datetime.fromisoformat( # type: ignore property.get("default") ) except Exception: pass with streamlit_app.container(): streamlit_app.subheader(streamlit_kwargs.get("label")) if streamlit_kwargs.get("description"): streamlit_app.text(streamlit_kwargs.get("description")) selected_date = None selected_time = None date_col, time_col = streamlit_app.columns(2) with date_col: date_kwargs = {"label": "Date", "key": key + "-date-input"} if streamlit_kwargs.get("value"): try: date_kwargs["value"] = streamlit_kwargs.get( # type: ignore "value" ).date() except Exception: pass selected_date = streamlit_app.date_input(**date_kwargs) with time_col: time_kwargs = {"label": "Time", "key": key + "-time-input"} if streamlit_kwargs.get("value"): try: time_kwargs["value"] = streamlit_kwargs.get( # type: ignore "value" ).time() except Exception: pass selected_time = streamlit_app.time_input(**time_kwargs) return datetime.datetime.combine(selected_date, selected_time) else: streamlit_app.warning( "Date format is not supported: " + str(property.get("format")) ) def _render_single_file_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: streamlit_kwargs = self._get_default_streamlit_input_kwargs(key, property) file_extension = None if "mime_type" in property: file_extension = mimetypes.guess_extension(property["mime_type"]) uploaded_file = streamlit_app.file_uploader( **streamlit_kwargs, accept_multiple_files=False, type=file_extension ) if uploaded_file is None: return None bytes = uploaded_file.getvalue() if property.get("mime_type"): if is_compatible_audio(property["mime_type"]): # Show audio streamlit_app.audio(bytes, format=property.get("mime_type")) if is_compatible_image(property["mime_type"]): # Show image streamlit_app.image(bytes) if is_compatible_video(property["mime_type"]): # Show video streamlit_app.video(bytes, format=property.get("mime_type")) return bytes def _render_single_string_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: streamlit_kwargs = self._get_default_streamlit_input_kwargs(key, property) if property.get("default"): streamlit_kwargs["value"] = property.get("default") elif property.get("example"): # TODO: also use example for other property types # Use example as value if it is provided streamlit_kwargs["value"] = property.get("example") if property.get("maxLength") is not None: streamlit_kwargs["max_chars"] = property.get("maxLength") if ( property.get("format") or ( property.get("maxLength") is not None and int(property.get("maxLength")) < 140 # type: ignore ) or property.get("writeOnly") ): # If any format is set, use single text input # If max chars is set to less than 140, use single text input # If write only -> password field if property.get("writeOnly"): streamlit_kwargs["type"] = "password" return streamlit_app.text_input(**streamlit_kwargs) else: # Otherwise use multiline text area return streamlit_app.text_area(**streamlit_kwargs) def _render_multi_enum_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: streamlit_kwargs = self._get_default_streamlit_input_kwargs(key, property) reference_item = schema_utils.resolve_reference( property["items"]["$ref"], self._schema_references ) # TODO: how to select defaults return streamlit_app.multiselect( **streamlit_kwargs, options=reference_item["enum"] ) def _render_single_enum_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: streamlit_kwargs = self._get_default_streamlit_input_kwargs(key, property) reference_item = schema_utils.get_single_reference_item( property, self._schema_references ) if property.get("default") is not None: try: streamlit_kwargs["index"] = reference_item["enum"].index( property.get("default") ) except Exception: # Use default selection pass return streamlit_app.selectbox( **streamlit_kwargs, options=reference_item["enum"] ) def _render_single_dict_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: # Add title and subheader streamlit_app.subheader(property.get("title")) if property.get("description"): streamlit_app.markdown(property.get("description")) streamlit_app.markdown("---") current_dict = self._get_value(key) if not current_dict: current_dict = {} key_col, value_col = streamlit_app.columns(2) with key_col: updated_key = streamlit_app.text_input( "Key", value="", key=key + "-new-key" ) with value_col: # TODO: also add boolean? value_kwargs = {"label": "Value", "key": key + "-new-value"} if property["additionalProperties"].get("type") == "integer": value_kwargs["value"] = 0 # type: ignore updated_value = streamlit_app.number_input(**value_kwargs) elif property["additionalProperties"].get("type") == "number": value_kwargs["value"] = 0.0 # type: ignore value_kwargs["format"] = "%f" updated_value = streamlit_app.number_input(**value_kwargs) else: value_kwargs["value"] = "" updated_value = streamlit_app.text_input(**value_kwargs) streamlit_app.markdown("---") with streamlit_app.container(): clear_col, add_col = streamlit_app.columns([1, 2]) with clear_col: if streamlit_app.button("Clear Items", key=key + "-clear-items"): current_dict = {} with add_col: if ( streamlit_app.button("Add Item", key=key + "-add-item") and updated_key ): current_dict[updated_key] = updated_value streamlit_app.write(current_dict) return current_dict def _render_single_reference( self, streamlit_app: st, key: str, property: Dict ) -> Any: reference_item = schema_utils.get_single_reference_item( property, self._schema_references ) return self._render_property(streamlit_app, key, reference_item) def _render_multi_file_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: streamlit_kwargs = self._get_default_streamlit_input_kwargs(key, property) file_extension = None if "mime_type" in property: file_extension = mimetypes.guess_extension(property["mime_type"]) uploaded_files = streamlit_app.file_uploader( **streamlit_kwargs, accept_multiple_files=True, type=file_extension ) uploaded_files_bytes = [] if uploaded_files: for uploaded_file in uploaded_files: uploaded_files_bytes.append(uploaded_file.read()) return uploaded_files_bytes def _render_single_boolean_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: streamlit_kwargs = self._get_default_streamlit_input_kwargs(key, property) if property.get("default"): streamlit_kwargs["value"] = property.get("default") return streamlit_app.checkbox(**streamlit_kwargs) def _render_single_number_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: streamlit_kwargs = self._get_default_streamlit_input_kwargs(key, property) number_transform = int if property.get("type") == "number": number_transform = float # type: ignore streamlit_kwargs["format"] = "%f" if "multipleOf" in property: # Set stepcount based on multiple of parameter streamlit_kwargs["step"] = number_transform(property["multipleOf"]) elif number_transform == int: # Set step size to 1 as default streamlit_kwargs["step"] = 1 elif number_transform == float: # Set step size to 0.01 as default # TODO: adapt to default value streamlit_kwargs["step"] = 0.01 if "minimum" in property: streamlit_kwargs["min_value"] = number_transform(property["minimum"]) if "exclusiveMinimum" in property: streamlit_kwargs["min_value"] = number_transform( property["exclusiveMinimum"] + streamlit_kwargs["step"] ) if "maximum" in property: streamlit_kwargs["max_value"] = number_transform(property["maximum"]) if "exclusiveMaximum" in property: streamlit_kwargs["max_value"] = number_transform( property["exclusiveMaximum"] - streamlit_kwargs["step"] ) if property.get("default") is not None: streamlit_kwargs["value"] = number_transform(property.get("default")) # type: ignore else: if "min_value" in streamlit_kwargs: streamlit_kwargs["value"] = streamlit_kwargs["min_value"] elif number_transform == int: streamlit_kwargs["value"] = 0 else: # Set default value to step streamlit_kwargs["value"] = number_transform(streamlit_kwargs["step"]) if "min_value" in streamlit_kwargs and "max_value" in streamlit_kwargs: # TODO: Only if less than X steps return streamlit_app.slider(**streamlit_kwargs) else: return streamlit_app.number_input(**streamlit_kwargs) def _render_object_input(self, streamlit_app: st, key: str, property: Dict) -> Any: properties = property["properties"] object_inputs = {} for property_key in properties: property = properties[property_key] if not property.get("title"): # Set property key as fallback title property["title"] = name_to_title(property_key) # construct full key based on key parts -> required later to get the value full_key = key + "." + property_key object_inputs[property_key] = self._render_property( streamlit_app, full_key, property ) return object_inputs def _render_single_object_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: # Add title and subheader title = property.get("title") streamlit_app.subheader(title) if property.get("description"): streamlit_app.markdown(property.get("description")) object_reference = schema_utils.get_single_reference_item( property, self._schema_references ) return self._render_object_input(streamlit_app, key, object_reference) def _render_property_list_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: # Add title and subheader streamlit_app.subheader(property.get("title")) if property.get("description"): streamlit_app.markdown(property.get("description")) streamlit_app.markdown("---") current_list = self._get_value(key) if not current_list: current_list = [] value_kwargs = {"label": "Value", "key": key + "-new-value"} if property["items"]["type"] == "integer": value_kwargs["value"] = 0 # type: ignore new_value = streamlit_app.number_input(**value_kwargs) elif property["items"]["type"] == "number": value_kwargs["value"] = 0.0 # type: ignore value_kwargs["format"] = "%f" new_value = streamlit_app.number_input(**value_kwargs) else: value_kwargs["value"] = "" new_value = streamlit_app.text_input(**value_kwargs) streamlit_app.markdown("---") with streamlit_app.container(): clear_col, add_col = streamlit_app.columns([1, 2]) with clear_col: if streamlit_app.button("Clear Items", key=key + "-clear-items"): current_list = [] with add_col: if ( streamlit_app.button("Add Item", key=key + "-add-item") and new_value is not None ): current_list.append(new_value) streamlit_app.write(current_list) return current_list def _render_object_list_input( self, streamlit_app: st, key: str, property: Dict ) -> Any: # TODO: support max_items, and min_items properties # Add title and subheader streamlit_app.subheader(property.get("title")) if property.get("description"): streamlit_app.markdown(property.get("description")) streamlit_app.markdown("---") current_list = self._get_value(key) if not current_list: current_list = [] object_reference = schema_utils.resolve_reference( property["items"]["$ref"], self._schema_references ) input_data = self._render_object_input(streamlit_app, key, object_reference) streamlit_app.markdown("---") with streamlit_app.container(): clear_col, add_col = streamlit_app.columns([1, 2]) with clear_col: if streamlit_app.button("Clear Items", key=key + "-clear-items"): current_list = [] with add_col: if ( streamlit_app.button("Add Item", key=key + "-add-item") and input_data ): current_list.append(input_data) streamlit_app.write(current_list) return current_list def _render_property(self, streamlit_app: st, key: str, property: Dict) -> Any: if schema_utils.is_single_enum_property(property, self._schema_references): return self._render_single_enum_input(streamlit_app, key, property) if schema_utils.is_multi_enum_property(property, self._schema_references): return self._render_multi_enum_input(streamlit_app, key, property) if schema_utils.is_single_file_property(property): return self._render_single_file_input(streamlit_app, key, property) if schema_utils.is_multi_file_property(property): return self._render_multi_file_input(streamlit_app, key, property) if schema_utils.is_single_datetime_property(property): return self._render_single_datetime_input(streamlit_app, key, property) if schema_utils.is_single_boolean_property(property): return self._render_single_boolean_input(streamlit_app, key, property) if schema_utils.is_single_dict_property(property): return self._render_single_dict_input(streamlit_app, key, property) if schema_utils.is_single_number_property(property): return self._render_single_number_input(streamlit_app, key, property) if schema_utils.is_single_string_property(property): return self._render_single_string_input(streamlit_app, key, property) if schema_utils.is_single_object(property, self._schema_references): return self._render_single_object_input(streamlit_app, key, property) if schema_utils.is_object_list_property(property, self._schema_references): return self._render_object_list_input(streamlit_app, key, property) if schema_utils.is_property_list(property): return self._render_property_list_input(streamlit_app, key, property) if schema_utils.is_single_reference(property): return self._render_single_reference(streamlit_app, key, property) streamlit_app.warning( "The type of the following property is currently not supported: " + str(property.get("title")) ) raise Exception("Unsupported property") class OutputUI: def __init__(self, output_data: Any, input_data: Any): self._output_data = output_data self._input_data = input_data def render_ui(self, streamlit_app) -> None: try: if isinstance(self._output_data, BaseModel): self._render_single_output(streamlit_app, self._output_data) return if type(self._output_data) == list: self._render_list_output(streamlit_app, self._output_data) return except Exception as ex: streamlit_app.exception(ex) # Fallback to streamlit_app.json(jsonable_encoder(self._output_data)) def _render_single_text_property( self, streamlit: st, property_schema: Dict, value: Any ) -> None: # Add title and subheader streamlit.subheader(property_schema.get("title")) if property_schema.get("description"): streamlit.markdown(property_schema.get("description")) if value is None or value == "": streamlit.info("No value returned!") else: streamlit.code(str(value), language="plain") def _render_single_file_property( self, streamlit: st, property_schema: Dict, value: Any ) -> None: # Add title and subheader streamlit.subheader(property_schema.get("title")) if property_schema.get("description"): streamlit.markdown(property_schema.get("description")) if value is None or value == "": streamlit.info("No value returned!") else: # TODO: Detect if it is a FileContent instance # TODO: detect if it is base64 file_extension = "" if "mime_type" in property_schema: mime_type = property_schema["mime_type"] file_extension = mimetypes.guess_extension(mime_type) or "" if is_compatible_audio(mime_type): streamlit.audio(value.as_bytes(), format=mime_type) return if is_compatible_image(mime_type): streamlit.image(value.as_bytes()) return if is_compatible_video(mime_type): streamlit.video(value.as_bytes(), format=mime_type) return filename = ( (property_schema["title"] + file_extension) .lower() .strip() .replace(" ", "-") ) streamlit.markdown( f'', unsafe_allow_html=True, ) def _render_single_complex_property( self, streamlit: st, property_schema: Dict, value: Any ) -> None: # Add title and subheader streamlit.subheader(property_schema.get("title")) if property_schema.get("description"): streamlit.markdown(property_schema.get("description")) streamlit.json(jsonable_encoder(value)) def _render_single_output(self, streamlit: st, output_data: BaseModel) -> None: try: if has_output_ui_renderer(output_data): if function_has_named_arg(output_data.render_output_ui, "input"): # type: ignore # render method also requests the input data output_data.render_output_ui(streamlit, input=self._input_data) # type: ignore else: output_data.render_output_ui(streamlit) # type: ignore return except Exception: # Use default auto-generation methods if the custom rendering throws an exception logger.exception( "Failed to execute custom render_output_ui function. Using auto-generation instead" ) model_schema = output_data.schema(by_alias=False) model_properties = model_schema.get("properties") definitions = model_schema.get("definitions") if model_properties: for property_key in output_data.__dict__: property_schema = model_properties.get(property_key) if not property_schema.get("title"): # Set property key as fallback title property_schema["title"] = property_key output_property_value = output_data.__dict__[property_key] if has_output_ui_renderer(output_property_value): output_property_value.render_output_ui(streamlit) # type: ignore continue if isinstance(output_property_value, BaseModel): # Render output recursivly streamlit.subheader(property_schema.get("title")) if property_schema.get("description"): streamlit.markdown(property_schema.get("description")) self._render_single_output(streamlit, output_property_value) continue if property_schema: if schema_utils.is_single_file_property(property_schema): self._render_single_file_property( streamlit, property_schema, output_property_value ) continue if ( schema_utils.is_single_string_property(property_schema) or schema_utils.is_single_number_property(property_schema) or schema_utils.is_single_datetime_property(property_schema) or schema_utils.is_single_boolean_property(property_schema) ): self._render_single_text_property( streamlit, property_schema, output_property_value ) continue if definitions and schema_utils.is_single_enum_property( property_schema, definitions ): self._render_single_text_property( streamlit, property_schema, output_property_value.value ) continue # TODO: render dict as table self._render_single_complex_property( streamlit, property_schema, output_property_value ) return def _render_list_output(self, streamlit: st, output_data: List) -> None: try: data_items: List = [] for data_item in output_data: if has_output_ui_renderer(data_item): # Render using the render function data_item.render_output_ui(streamlit) # type: ignore continue data_items.append(data_item.dict()) # Try to show as dataframe streamlit.table(pd.DataFrame(data_items)) except Exception: # Fallback to streamlit.json(jsonable_encoder(output_data)) def getOpyrator(mode: str) -> Opyrator: if mode == None or mode.startswith('VC'): from mkgui.app_vc import convert return Opyrator(convert) if mode == None or mode.startswith('预处理'): from mkgui.preprocess import preprocess return Opyrator(preprocess) if mode == None or mode.startswith('模型训练'): from mkgui.train import train return Opyrator(train) if mode == None or mode.startswith('模型训练(VC)'): from mkgui.train_vc import train_vc return Opyrator(train_vc) from mkgui.app import synthesize return Opyrator(synthesize) def render_streamlit_ui() -> None: # init session_state = st.session_state session_state.input_data = {} # Add custom css settings st.markdown(f"", unsafe_allow_html=True) with st.spinner("Loading MockingBird GUI. Please wait..."): session_state.mode = st.sidebar.selectbox( '模式选择', ( "AI拟音", "VC拟音", "预处理", "模型训练", "模型训练(VC)") ) if "mode" in session_state: mode = session_state.mode else: mode = "" opyrator = getOpyrator(mode) title = opyrator.name + mode col1, col2, _ = st.columns(3) col2.title(title) col2.markdown("欢迎使用MockingBird Web 2") image = Image.open('.\\mkgui\\static\\mb.png') col1.image(image) st.markdown("---") left, right = st.columns([0.4, 0.6]) with left: st.header("Control 控制") InputUI(session_state=session_state, input_class=opyrator.input_type).render_ui(st) execute_selected = st.button(opyrator.action) if execute_selected: with st.spinner("Executing operation. Please wait..."): try: input_data_obj = parse_obj_as( opyrator.input_type, session_state.input_data ) session_state.output_data = opyrator(input=input_data_obj) session_state.latest_operation_input = input_data_obj # should this really be saved as additional session object? except ValidationError as ex: st.error(ex) else: # st.success("Operation executed successfully.") pass with right: st.header("Result 结果") if 'output_data' in session_state: OutputUI( session_state.output_data, session_state.latest_operation_input ).render_ui(st) if st.button("Clear"): # Clear all state for key in st.session_state.keys(): del st.session_state[key] session_state.input_data = {} st.experimental_rerun() else: # placeholder st.caption("请使用左侧控制板进行输入并运行获得结果")