Image-Text-to-Text
Transformers
Safetensors
multilingual
minicpmv
feature-extraction
minicpm-v
vision
ocr
multi-image
video
custom_code
conversational
Instructions to use openbmb/MiniCPM-V-4 with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use openbmb/MiniCPM-V-4 with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("image-text-to-text", model="openbmb/MiniCPM-V-4", trust_remote_code=True) messages = [ { "role": "user", "content": [ {"type": "image", "url": "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/p-blog/candy.JPG"}, {"type": "text", "text": "What animal is on the candy?"} ] }, ] pipe(text=messages)# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("openbmb/MiniCPM-V-4", trust_remote_code=True, dtype="auto") - Notebooks
- Google Colab
- Kaggle
- Local Apps
- vLLM
How to use openbmb/MiniCPM-V-4 with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "openbmb/MiniCPM-V-4" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "openbmb/MiniCPM-V-4", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }'Use Docker
docker model run hf.co/openbmb/MiniCPM-V-4
- SGLang
How to use openbmb/MiniCPM-V-4 with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "openbmb/MiniCPM-V-4" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "openbmb/MiniCPM-V-4", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "openbmb/MiniCPM-V-4" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "openbmb/MiniCPM-V-4", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }' - Docker Model Runner
How to use openbmb/MiniCPM-V-4 with Docker Model Runner:
docker model run hf.co/openbmb/MiniCPM-V-4
| import math | |
| from typing import List, Optional | |
| import json | |
| import torch | |
| import torchvision | |
| from threading import Thread | |
| from copy import deepcopy | |
| from PIL import Image | |
| from transformers import AutoProcessor, TextIteratorStreamer | |
| from .configuration_minicpm import MiniCPMVConfig | |
| from transformers import LlamaForCausalLM, LlamaPreTrainedModel | |
| from .modeling_navit_siglip import SiglipVisionTransformer | |
| from .resampler import Resampler | |
| class MiniCPMVPreTrainedModel(LlamaPreTrainedModel): | |
| config_class = MiniCPMVConfig | |
| class MiniCPMV(MiniCPMVPreTrainedModel): | |
| def __init__(self, config): | |
| super().__init__(config) | |
| self.llm = LlamaForCausalLM(config) | |
| self.vpm = self.init_vision_module() | |
| self.vision_dim = self.vpm.embed_dim | |
| self.embed_dim = self.llm.config.hidden_size | |
| self.resampler = self.init_resampler(self.embed_dim, self.vision_dim) | |
| self.processor = None | |
| self.terminators = ['<|im_end|>', '</s>'] | |
| def init_vision_module(self): | |
| # same as HuggingFaceM4/siglip-so400m-14-980-flash-attn2-navit add tgt_sizes | |
| if self.config._attn_implementation == 'flash_attention_2': | |
| self.config.vision_config._attn_implementation = 'flash_attention_2' | |
| else: | |
| # not suport sdpa | |
| self.config.vision_config._attn_implementation = 'eager' | |
| model = SiglipVisionTransformer(self.config.vision_config) | |
| if self.config.drop_vision_last_layer: | |
| model.encoder.layers = model.encoder.layers[:-1] | |
| setattr(model, 'embed_dim', model.embeddings.embed_dim) | |
| setattr(model, 'patch_size', model.embeddings.patch_size) | |
| return model | |
| def init_resampler(self, embed_dim, vision_dim): | |
| return Resampler( | |
| num_queries=self.config.query_num, | |
| embed_dim=embed_dim, | |
| num_heads=embed_dim // 128, | |
| kv_dim=vision_dim, | |
| adaptive=True | |
| ) | |
| def get_input_embeddings(self): | |
| return self.llm.get_input_embeddings() | |
| def set_input_embeddings(self, value): | |
| self.llm.embed_tokens = value | |
| def get_output_embeddings(self): | |
| return self.llm.lm_head | |
| def set_output_embeddings(self, new_embeddings): | |
| self.llm.lm_head = new_embeddings | |
| def set_decoder(self, decoder): | |
| self.llm = decoder | |
| def get_decoder(self): | |
| return self.llm | |
| def get_vllm_embedding(self, data): | |
| if 'vision_hidden_states' not in data: | |
| dtype = self.llm.model.embed_tokens.weight.dtype | |
| device = self.llm.model.embed_tokens.weight.device | |
| tgt_sizes = data['tgt_sizes'] | |
| pixel_values_list = data['pixel_values'] | |
| vision_hidden_states = [] | |
| all_pixel_values = [] | |
| img_cnt = [] | |
| for pixel_values in pixel_values_list: | |
| img_cnt.append(len(pixel_values)) | |
| all_pixel_values.extend([i.flatten(end_dim=1).permute(1, 0) for i in pixel_values]) | |
| # exist image | |
| if all_pixel_values: | |
| tgt_sizes = [tgt_size for tgt_size in tgt_sizes if isinstance(tgt_size, torch.Tensor)] | |
| tgt_sizes = torch.vstack(tgt_sizes).type(torch.int32) | |
| max_patches = torch.max(tgt_sizes[:, 0] * tgt_sizes[:, 1]) | |
| all_pixel_values = torch.nn.utils.rnn.pad_sequence(all_pixel_values, batch_first=True, | |
| padding_value=0.0) | |
| B, L, _ = all_pixel_values.shape | |
| all_pixel_values = all_pixel_values.permute(0, 2, 1).reshape(B, 3, -1, L) | |
| patch_attn_mask = torch.zeros((B, 1, max_patches), dtype=torch.bool, device=device) | |
| for i in range(B): | |
| patch_attn_mask[i, 0, :tgt_sizes[i][0] * tgt_sizes[i][1]] = True | |
| vision_batch_size = self.config.vision_batch_size | |
| all_pixel_values = all_pixel_values.type(dtype).to(device=device) | |
| if B > vision_batch_size: | |
| hs = [] | |
| for i in range(0, B, vision_batch_size): | |
| start_idx = i | |
| end_idx = i + vision_batch_size | |
| tmp_hs = self.vpm(all_pixel_values[start_idx:end_idx], patch_attention_mask=patch_attn_mask[start_idx:end_idx], tgt_sizes=tgt_sizes[start_idx:end_idx]).last_hidden_state | |
| hs.append(tmp_hs) | |
| vision_embedding = torch.cat(hs, dim=0) | |
| else: | |
| vision_embedding = self.vpm(all_pixel_values, patch_attention_mask=patch_attn_mask, tgt_sizes=tgt_sizes).last_hidden_state | |
| vision_embedding = self.resampler(vision_embedding, tgt_sizes) | |
| start = 0 | |
| for pixel_values in pixel_values_list: | |
| img_cnt = len(pixel_values) | |
| if img_cnt > 0: | |
| vision_hidden_states.append(vision_embedding[start: start + img_cnt]) | |
| start += img_cnt | |
| else: | |
| vision_hidden_states.append([]) | |
| else: # no image | |
| if self.training: | |
| dummy_image = torch.zeros( | |
| (1, 3, 224, 224), | |
| device=device, dtype=dtype | |
| ) | |
| tgt_sizes = torch.Tensor([[(224 // self.config.patch_size), math.ceil(224 / self.config.patch_size)]]).type(torch.int32) | |
| dummy_feature = self.resampler(self.vpm(dummy_image).last_hidden_state, tgt_sizes) | |
| else: | |
| dummy_feature = [] | |
| for _ in range(len(pixel_values_list)): | |
| vision_hidden_states.append(dummy_feature) | |
| else: | |
| vision_hidden_states = data['vision_hidden_states'] | |
| if hasattr(self.llm.config, 'scale_emb'): | |
| vllm_embedding = self.llm.model.embed_tokens(data['input_ids']) * self.llm.config.scale_emb | |
| else: | |
| vllm_embedding = self.llm.model.embed_tokens(data['input_ids']) | |
| vision_hidden_states = [i.type(vllm_embedding.dtype) if isinstance( | |
| i, torch.Tensor) else i for i in vision_hidden_states] | |
| bs = len(data['input_ids']) | |
| device = vllm_embedding.device | |
| embed_dim = vllm_embedding.shape[-1] | |
| new_vllm_embeddings = [] | |
| for i in range(bs): | |
| cur_vs_hs = vision_hidden_states[i] | |
| cur_vllm_emb = vllm_embedding[i] | |
| if len(cur_vs_hs) == 0: | |
| new_vllm_embeddings.append(cur_vllm_emb) | |
| continue | |
| cur_image_bound = data['image_bound'][i] | |
| if len(cur_image_bound) > 0: | |
| image_indices = torch.stack([ | |
| torch.arange(r[0], r[1], dtype=torch.long) | |
| for r in cur_image_bound | |
| ], dim=0).flatten().to(device) | |
| indices_expanded = image_indices.view(-1, 1).expand(-1, embed_dim) | |
| vision_features = cur_vs_hs.view(-1, embed_dim) | |
| updated_emb = cur_vllm_emb.scatter(0, indices_expanded, vision_features) | |
| new_vllm_embeddings.append(updated_emb) | |
| elif self.training: | |
| dummy_term = cur_vs_hs[0].sum() * 0 | |
| new_vllm_embeddings.append(cur_vllm_emb + dummy_term) | |
| else: | |
| new_vllm_embeddings.append(cur_vllm_emb) | |
| vllm_embedding = torch.stack(new_vllm_embeddings, dim=0) | |
| return vllm_embedding, vision_hidden_states | |
| def forward(self, data=None, **kwargs): | |
| if isinstance(data, torch.Tensor): | |
| attention_mask = torch.ones_like(data, dtype=torch.bool) | |
| kwargs = {'attention_mask': attention_mask} | |
| return self.llm( | |
| input_ids=data, | |
| **kwargs | |
| ) | |
| if data is None: | |
| data = { | |
| "input_ids": kwargs.pop("input_ids", None), | |
| "pixel_values": kwargs.pop("pixel_values", None), | |
| "image_bound": kwargs.pop("image_bound", None), | |
| "tgt_sizes": kwargs.pop("tgt_sizes", None), | |
| "position_ids": kwargs.pop("position_ids", None), | |
| } | |
| else: | |
| kwargs.pop("input_ids", None) | |
| kwargs.pop("pixel_values", None) | |
| kwargs.pop("image_bound", None) | |
| kwargs.pop("tgt_sizes", None) | |
| kwargs.pop("position_ids", None) | |
| kwargs.pop("inputs_embeds", None) | |
| vllm_embedding, vision_hidden_states = self.get_vllm_embedding(data) | |
| position_ids = data["position_ids"] | |
| if position_ids.dtype != torch.int64: | |
| position_ids = position_ids.long() | |
| return self.llm( | |
| input_ids=None, | |
| position_ids=position_ids, | |
| inputs_embeds=vllm_embedding, | |
| **kwargs | |
| ) | |
| def _decode(self, inputs_embeds, tokenizer, attention_mask, decode_text=False, **kwargs): | |
| terminators = [tokenizer.convert_tokens_to_ids(i) for i in self.terminators] | |
| output = self.llm.generate( | |
| inputs_embeds=inputs_embeds, | |
| pad_token_id=0, | |
| eos_token_id=terminators, | |
| attention_mask=attention_mask, | |
| **kwargs | |
| ) | |
| if decode_text: | |
| return self._decode_text(output, tokenizer) | |
| return output | |
| def _decode_stream(self, inputs_embeds, tokenizer, **kwargs): | |
| terminators = [tokenizer.convert_tokens_to_ids(i) for i in self.terminators] | |
| streamer = TextIteratorStreamer(tokenizer=tokenizer) | |
| generation_kwargs = { | |
| 'inputs_embeds': inputs_embeds, | |
| 'pad_token_id': 0, | |
| 'eos_token_id': terminators, | |
| 'streamer': streamer | |
| } | |
| generation_kwargs.update(kwargs) | |
| thread = Thread(target=self.llm.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| return streamer | |
| def _decode_text(self, result_ids, tokenizer): | |
| terminators = [tokenizer.convert_tokens_to_ids(i) for i in self.terminators] | |
| result_text = [] | |
| for result in result_ids: | |
| result = result[result != 0] | |
| if result[0] == tokenizer.bos_id: | |
| result = result[1:] | |
| if result[-1] in terminators: | |
| result = result[:-1] | |
| result_text.append(tokenizer.decode(result).strip()) | |
| return result_text | |
| def generate( | |
| self, | |
| input_ids=None, | |
| pixel_values=None, | |
| tgt_sizes=None, | |
| image_bound=None, | |
| attention_mask=None, | |
| tokenizer=None, | |
| vision_hidden_states=None, | |
| return_vision_hidden_states=False, | |
| stream=False, | |
| decode_text=False, | |
| **kwargs | |
| ): | |
| assert input_ids is not None | |
| assert len(input_ids) == len(pixel_values) | |
| model_inputs = { | |
| "input_ids": input_ids, | |
| "image_bound": image_bound, | |
| } | |
| if vision_hidden_states is None: | |
| model_inputs["pixel_values"] = pixel_values | |
| model_inputs['tgt_sizes'] = tgt_sizes | |
| else: | |
| model_inputs["vision_hidden_states"] = vision_hidden_states | |
| with torch.inference_mode(): | |
| ( | |
| model_inputs["inputs_embeds"], | |
| vision_hidden_states, | |
| ) = self.get_vllm_embedding(model_inputs) | |
| if stream: | |
| result = self._decode_stream(model_inputs["inputs_embeds"], tokenizer, **kwargs) | |
| else: | |
| result = self._decode(model_inputs["inputs_embeds"], tokenizer, attention_mask, decode_text=decode_text, **kwargs) | |
| if return_vision_hidden_states: | |
| return result, vision_hidden_states | |
| return result | |
| def chat( | |
| self, | |
| image=None, | |
| msgs=None, | |
| tokenizer=None, | |
| processor=None, | |
| vision_hidden_states=None, | |
| max_new_tokens=2048, | |
| min_new_tokens=0, | |
| sampling=True, | |
| max_inp_length=32768, | |
| system_prompt='', | |
| stream=False, | |
| max_slice_nums=None, | |
| use_image_id=None, | |
| **kwargs | |
| ): | |
| if isinstance(msgs[0], list): | |
| batched = True | |
| else: | |
| batched = False | |
| msgs_list = msgs | |
| images_list = image | |
| if batched is False: | |
| images_list, msgs_list = [images_list], [msgs_list] | |
| else: | |
| assert images_list is None, "Please integrate image to msgs when using batch inference." | |
| images_list = [None] * len(msgs_list) | |
| assert len(images_list) == len(msgs_list), "The batch dim of images_list and msgs_list should be the same." | |
| if processor is None: | |
| if self.processor is None: | |
| self.processor = AutoProcessor.from_pretrained(self.config._name_or_path, trust_remote_code=True) | |
| processor = self.processor | |
| assert self.config.query_num == processor.image_processor.image_feature_size, "These two values should be the same. Check `config.json` and `preprocessor_config.json`." | |
| assert self.config.patch_size == processor.image_processor.patch_size, "These two values should be the same. Check `config.json` and `preprocessor_config.json`." | |
| assert self.config.use_image_id == processor.image_processor.use_image_id, "These two values should be the same. Check `config.json` and `preprocessor_config.json`." | |
| assert self.config.slice_config.max_slice_nums == processor.image_processor.max_slice_nums, "These two values should be the same. Check `config.json` and `preprocessor_config.json`." | |
| assert self.config.slice_mode == processor.image_processor.slice_mode, "These two values should be the same. Check `config.json` and `preprocessor_config.json`." | |
| prompts_lists = [] | |
| input_images_lists = [] | |
| for image, msgs in zip(images_list, msgs_list): | |
| if isinstance(msgs, str): | |
| msgs = json.loads(msgs) | |
| copy_msgs = deepcopy(msgs) | |
| assert len(msgs) > 0, "msgs is empty" | |
| assert sampling or not stream, "if use stream mode, make sure sampling=True" | |
| if image is not None and isinstance(copy_msgs[0]["content"], str): | |
| copy_msgs[0]["content"] = [image, copy_msgs[0]["content"]] | |
| images = [] | |
| for i, msg in enumerate(copy_msgs): | |
| role = msg["role"] | |
| content = msg["content"] | |
| assert role in ["user", "assistant"] | |
| if i == 0: | |
| assert role == "user", "The role of first msg should be user" | |
| if isinstance(content, str): | |
| content = [content] | |
| cur_msgs = [] | |
| for c in content: | |
| if isinstance(c, Image.Image): | |
| images.append(c) | |
| cur_msgs.append("(<image>./</image>)") | |
| elif isinstance(c, str): | |
| cur_msgs.append(c) | |
| msg["content"] = "\n".join(cur_msgs) | |
| if system_prompt: | |
| sys_msg = {'role': 'system', 'content': system_prompt} | |
| copy_msgs = [sys_msg] + copy_msgs | |
| prompts_lists.append(processor.tokenizer.apply_chat_template(copy_msgs, tokenize=False, add_generation_prompt=True)) | |
| input_images_lists.append(images) | |
| inputs = processor( | |
| prompts_lists, | |
| input_images_lists, | |
| max_slice_nums=max_slice_nums, | |
| use_image_id=use_image_id, | |
| return_tensors="pt", | |
| max_length=max_inp_length | |
| ).to(self.device) | |
| if sampling: | |
| generation_config = { | |
| "top_p": 0.8, | |
| "top_k": 100, | |
| "temperature": 0.7, | |
| "do_sample": True, | |
| "repetition_penalty": 1.05 | |
| } | |
| else: | |
| generation_config = { | |
| "num_beams": 3, | |
| "repetition_penalty": 1.2, | |
| } | |
| if min_new_tokens > 0: | |
| generation_config['min_new_tokens'] = min_new_tokens | |
| generation_config.update( | |
| (k, kwargs[k]) for k in generation_config.keys() & kwargs.keys() | |
| ) | |
| inputs.pop("image_sizes") | |
| with torch.inference_mode(): | |
| res = self.generate( | |
| **inputs, | |
| tokenizer=tokenizer, | |
| max_new_tokens=max_new_tokens, | |
| vision_hidden_states=vision_hidden_states, | |
| stream=stream, | |
| decode_text=True, | |
| **generation_config | |
| ) | |
| if stream: | |
| def stream_gen(): | |
| for text in res: | |
| for term in self.terminators: | |
| text = text.replace(term, '') | |
| yield text | |
| return stream_gen() | |
| else: | |
| if batched: | |
| answer = res | |
| else: | |
| answer = res[0] | |
| return answer | |