v2 / vlib / mcp / mcp.v
864 lines · 778 sloc · 22.42 KB · 07184ccaf84ba1305cf58a3b1a689b7a735c7c55
Raw
1module mcp
2
3import json
4import net.http
5import os
6import time
7
8pub const jsonrpc_version = '2.0'
9pub const protocol_version = '2025-11-25'
10pub const parse_error = ResponseError{
11 code: -32700
12 message: 'Invalid JSON.'
13}
14pub const invalid_request = ResponseError{
15 code: -32600
16 message: 'Invalid request.'
17}
18pub const method_not_found = ResponseError{
19 code: -32601
20 message: 'Method not found.'
21}
22pub const invalid_params = ResponseError{
23 code: -32602
24 message: 'Invalid params.'
25}
26pub const internal_error = ResponseError{
27 code: -32603
28 message: 'Internal error.'
29}
30pub const server_not_initialized = ResponseError{
31 code: -32002
32 message: 'Server not initialized.'
33}
34
35const default_content_type = 'application/json'
36const streamable_http_accept = 'application/json, text/event-stream'
37const content_length_header = 'Content-Length'
38const mcp_session_id_header = 'Mcp-Session-Id'
39const process_poll_interval = 5 * time.millisecond
40const default_client_name = 'v.mcp'
41const default_client_version = 'dev'
42
43// ResponseError is the JSON-RPC error payload used by MCP responses.
44pub struct ResponseError {
45pub:
46 code int
47 message string
48 data string @[raw]
49}
50
51// code returns the JSON-RPC error code.
52pub fn (err ResponseError) code() int {
53 return err.code
54}
55
56// msg returns the JSON-RPC error message.
57pub fn (err ResponseError) msg() string {
58 return err.message
59}
60
61// err casts the response error to `IError`.
62pub fn (err ResponseError) err() IError {
63 return IError(err)
64}
65
66// Null represents the JSON `null` literal.
67pub struct Null {}
68
69// str returns the JSON `null` literal.
70pub fn (n Null) str() string {
71 return 'null'
72}
73
74pub const null = Null{}
75
76// Empty omits a JSON-RPC field when used with MCP helpers.
77pub struct Empty {}
78
79// str returns the empty string.
80pub fn (e Empty) str() string {
81 return ''
82}
83
84pub const empty = Empty{}
85
86// EmptyObject encodes to an empty JSON object.
87pub struct EmptyObject {}
88
89// str returns the JSON empty object literal.
90pub fn (e EmptyObject) str() string {
91 return '{}'
92}
93
94pub const empty_object = EmptyObject{}
95
96// Implementation identifies an MCP client or server implementation.
97pub struct Implementation {
98pub:
99 name string
100 version string
101}
102
103// InitializeParams is the typed payload for the `initialize` request.
104pub struct InitializeParams {
105pub:
106 protocol_version string @[json: protocolVersion]
107 capabilities string @[raw]
108 client_info Implementation @[json: clientInfo]
109}
110
111// InitializeResult is the typed result returned by an MCP server after initialization.
112pub struct InitializeResult {
113pub:
114 protocol_version string @[json: protocolVersion]
115 capabilities string @[raw]
116 server_info Implementation @[json: serverInfo]
117 instructions string
118}
119
120// Request is a JSON-RPC request message encoded for MCP.
121pub struct Request {
122pub:
123 jsonrpc string = jsonrpc_version
124 id string @[raw]
125 method string
126 params string @[omitempty; raw]
127}
128
129// new_request constructs an MCP request with a typed id and params payload.
130pub fn new_request[I, P](id I, method string, params P) Request {
131 return Request{
132 id: encode_id(id)
133 method: method
134 params: encode_value(params)
135 }
136}
137
138// encode serializes the request to JSON.
139pub fn (req Request) encode() string {
140 params_payload := if req.params.len == 0 { '' } else { ',"params":${req.params}' }
141 id_payload := if req.id.len == 0 { null.str() } else { req.id }
142 return '{"jsonrpc":"${jsonrpc_version}","id":${id_payload},"method":${json.encode(req.method)}${params_payload}}'
143}
144
145// decode_params decodes the raw request params into `T`.
146pub fn (req Request) decode_params[T]() !T {
147 return decode_value[T](req.params)
148}
149
150// Notification is a JSON-RPC notification encoded for MCP.
151pub struct Notification {
152pub:
153 jsonrpc string = jsonrpc_version
154 method string
155 params string @[omitempty; raw]
156}
157
158// new_notification constructs an MCP notification with a typed params payload.
159pub fn new_notification[P](method string, params P) Notification {
160 return Notification{
161 method: method
162 params: encode_value(params)
163 }
164}
165
166// encode serializes the notification to JSON.
167pub fn (notification Notification) encode() string {
168 params_payload := if notification.params.len == 0 {
169 ''
170 } else {
171 ',"params":${notification.params}'
172 }
173 return '{"jsonrpc":"${jsonrpc_version}","method":${json.encode(notification.method)}${params_payload}}'
174}
175
176// decode_params decodes the raw notification params into `T`.
177pub fn (notification Notification) decode_params[T]() !T {
178 return decode_value[T](notification.params)
179}
180
181// Response is a JSON-RPC response message encoded for MCP.
182pub struct Response {
183pub:
184 jsonrpc string = jsonrpc_version
185 id string @[raw]
186 result string @[raw]
187 error ResponseError
188}
189
190// new_response constructs an MCP response with a typed id and result payload.
191pub fn new_response[I, R](id I, result R, err ResponseError) Response {
192 return Response{
193 id: encode_id(id)
194 result: if err.code != 0 { '' } else { encode_value(result) }
195 error: err
196 }
197}
198
199// encode serializes the response to JSON.
200pub fn (resp Response) encode() string {
201 mut payload := '{"jsonrpc":"${jsonrpc_version}"'
202 if resp.error.code != 0 {
203 payload += ',"error":' + json.encode(resp.error)
204 } else {
205 result_payload := if resp.result.len == 0 { null.str() } else { resp.result }
206 payload += ',"result":' + result_payload
207 }
208 id_payload := if resp.id.len == 0 { null.str() } else { resp.id }
209 return payload + ',"id":${id_payload}}'
210}
211
212// decode_result decodes the response result into `T`.
213pub fn (resp Response) decode_result[T]() !T {
214 if resp.error.code != 0 {
215 return resp.error.err()
216 }
217 return decode_value[T](resp.result)
218}
219
220// decode_request decodes a JSON payload into an MCP request.
221pub fn decode_request(raw string) !Request {
222 return json.decode(Request, raw) or { return err }
223}
224
225// decode_notification decodes a JSON payload into an MCP notification.
226pub fn decode_notification(raw string) !Notification {
227 return json.decode(Notification, raw) or { return err }
228}
229
230// decode_response decodes a JSON payload into an MCP response.
231pub fn decode_response(raw string) !Response {
232 return json.decode(Response, raw) or { return err }
233}
234
235struct MessageEnvelope {
236 jsonrpc string
237 id string @[raw]
238 method string
239 params string @[raw]
240 result string @[raw]
241 error ResponseError
242}
243
244fn (env MessageEnvelope) encode() string {
245 if env.method.len != 0 {
246 if env.id.len == 0 || env.id == null.str() {
247 return Notification{
248 method: env.method
249 params: env.params
250 }.encode()
251 }
252 return Request{
253 id: env.id
254 method: env.method
255 params: env.params
256 }.encode()
257 }
258 return Response{
259 id: env.id
260 result: env.result
261 error: env.error
262 }.encode()
263}
264
265fn decode_envelope(raw string) !MessageEnvelope {
266 return json.decode(MessageEnvelope, raw) or { return err }
267}
268
269// Transport is the boundary between MCP messages and the wire format.
270pub interface Transport {
271mut:
272 send(message string) !
273 receive() !string
274 close()
275}
276
277@[params]
278pub struct ClientConfig {
279pub mut:
280 protocol_version string = protocol_version
281 client_info Implementation = Implementation{
282 name: default_client_name
283 version: default_client_version
284 }
285 capabilities string = '{}'
286 headers map[string]string
287}
288
289pub struct Client {
290mut:
291 transport Transport
292 config ClientConfig
293 next_id int = 1
294 initialized bool
295 init_result InitializeResult
296 pending_responses map[string]Response
297 notifications []Notification
298 server_requests []Request
299}
300
301// new_client constructs an MCP client on top of a custom transport.
302pub fn new_client(transport Transport, config ClientConfig) Client {
303 return Client{
304 transport: transport
305 config: config
306 pending_responses: map[string]Response{}
307 }
308}
309
310// connect creates an MCP client for a streamable HTTP endpoint.
311pub fn connect(url string) !Client {
312 return connect_http(url, ClientConfig{})
313}
314
315// connect_http creates an MCP client for a streamable HTTP endpoint.
316pub fn connect_http(url string, config ClientConfig) !Client {
317 transport := new_http_transport(url, config)!
318 return new_client(transport, config)
319}
320
321// connect_stdio creates an MCP client that talks to a local stdio server process.
322pub fn connect_stdio(command string, args []string, config ClientConfig) !Client {
323 transport := new_process_transport(command, args)!
324 return new_client(transport, config)
325}
326
327// initialize starts the MCP initialization handshake using the client's config.
328pub fn (mut c Client) initialize() !InitializeResult {
329 return c.initialize_with_raw(c.config.capabilities, c.config.client_info)
330}
331
332// initialize_with starts the MCP initialization handshake using typed capabilities.
333pub fn (mut c Client) initialize_with[X](capabilities X, client_info Implementation) !InitializeResult {
334 return c.initialize_with_raw(encode_value(capabilities), client_info)
335}
336
337// send_request sends a typed request and waits for its response.
338pub fn (mut c Client) send_request(request Request) !Response {
339 if request.method == 'initialize' {
340 return error('mcp.Client.initialize must be used for the MCP handshake')
341 }
342 c.ensure_initialized()!
343 c.transport.send(request.encode())!
344 return c.wait_for_response(request.id)
345}
346
347// request_message sends a method call and returns the raw MCP response.
348pub fn (mut c Client) request_message[P](method string, params P) !Response {
349 request := new_request(c.next_request_id(), method, params)
350 return c.send_request(request)
351}
352
353// request sends a method call and decodes its result into `Result`.
354pub fn (mut c Client) request[P, R](method string, params P) !R {
355 response := c.request_message(method, params)!
356 result := response.decode_result[R]()!
357 return result
358}
359
360// send_notification sends a typed notification message.
361pub fn (mut c Client) send_notification(notification Notification) ! {
362 if notification.method == 'notifications/initialized' {
363 return error('notifications/initialized is sent automatically after initialization')
364 }
365 c.ensure_initialized()!
366 c.transport.send(notification.encode())!
367}
368
369// notify sends a method notification with a typed params payload.
370pub fn (mut c Client) notify[P](method string, params P) ! {
371 c.send_notification(new_notification(method, params))!
372}
373
374// take_notifications drains notifications queued while waiting for responses.
375pub fn (mut c Client) take_notifications() []Notification {
376 if c.notifications.len == 0 {
377 return []Notification{}
378 }
379 drained := c.notifications.clone()
380 c.notifications = []Notification{}
381 return drained
382}
383
384// take_requests drains server initiated requests queued while waiting for responses.
385pub fn (mut c Client) take_requests() []Request {
386 if c.server_requests.len == 0 {
387 return []Request{}
388 }
389 drained := c.server_requests.clone()
390 c.server_requests = []Request{}
391 return drained
392}
393
394// close releases the underlying transport.
395pub fn (mut c Client) close() {
396 c.transport.close()
397}
398
399fn (mut c Client) initialize_with_raw(capabilities string, client_info Implementation) !InitializeResult {
400 if c.initialized {
401 return c.init_result
402 }
403 c.config.capabilities = normalize_capabilities(capabilities)
404 c.config.client_info = normalize_client_info(client_info)
405 params := InitializeParams{
406 protocol_version: normalize_protocol_version(c.config.protocol_version)
407 capabilities: c.config.capabilities
408 client_info: c.config.client_info
409 }
410 request := Request{
411 id: encode_id(c.next_request_id())
412 method: 'initialize'
413 params: encode_initialize_params(params)
414 }
415 c.transport.send(request.encode())!
416 response := c.wait_for_response(request.id)!
417 result := response.decode_result[InitializeResult]()!
418 c.transport.send(new_notification('notifications/initialized', empty).encode())!
419 c.initialized = true
420 c.init_result = result
421 return result
422}
423
424fn (mut c Client) ensure_initialized() ! {
425 if !c.initialized {
426 c.initialize()!
427 }
428}
429
430fn (mut c Client) next_request_id() int {
431 request_id := c.next_id
432 c.next_id++
433 return request_id
434}
435
436fn (mut c Client) wait_for_response(expected_id string) !Response {
437 if expected_id in c.pending_responses {
438 response := c.pending_responses[expected_id]
439 c.pending_responses.delete(expected_id)
440 return response
441 }
442 for {
443 raw_message := c.transport.receive()!
444 envelope := decode_envelope(raw_message)!
445 if envelope.method.len != 0 {
446 if envelope.id.len == 0 || envelope.id == null.str() {
447 c.notifications << Notification{
448 method: envelope.method
449 params: envelope.params
450 }
451 } else {
452 c.server_requests << Request{
453 id: envelope.id
454 method: envelope.method
455 params: envelope.params
456 }
457 }
458 continue
459 }
460 response := Response{
461 id: envelope.id
462 result: envelope.result
463 error: envelope.error
464 }
465 if response.id == expected_id {
466 return response
467 }
468 c.pending_responses[response.id] = response
469 }
470 return error('mcp: response loop exited unexpectedly')
471}
472
473fn encode_initialize_params(params InitializeParams) string {
474 return '{"protocolVersion":${json.encode(params.protocol_version)},"capabilities":${normalize_capabilities(params.capabilities)},"clientInfo":${json.encode(params.client_info)}}'
475}
476
477struct NoFrameError {}
478
479fn (err NoFrameError) msg() string {
480 return 'no complete frame available'
481}
482
483fn (err NoFrameError) code() int {
484 return 0
485}
486
487struct FrameExtraction {
488 message string
489 remaining string
490}
491
492struct HttpTransport {
493mut:
494 url string
495 header http.Header
496 session_id string
497 pending []string
498}
499
500fn new_http_transport(url string, config ClientConfig) !HttpTransport {
501 if url == '' {
502 return error('mcp.connect_http: empty url')
503 }
504 if !url.starts_with('http://') && !url.starts_with('https://') {
505 return error('mcp.connect_http: expected an http:// or https:// MCP endpoint')
506 }
507 mut header := http.new_header()
508 header.set(.user_agent, default_client_name)
509 if config.headers.len != 0 {
510 header.add_custom_map(config.headers)!
511 }
512 return HttpTransport{
513 url: url
514 header: header
515 }
516}
517
518fn (mut transport HttpTransport) send(message string) ! {
519 mut header := transport.header
520 header.set(.content_type, default_content_type)
521 header.set(.accept, streamable_http_accept)
522 if transport.session_id != '' {
523 header.set_custom(mcp_session_id_header, transport.session_id)!
524 }
525 response := http.fetch(
526 method: .post
527 url: transport.url
528 data: message
529 header: header
530 )!
531 if session_id := response.header.get_custom(mcp_session_id_header) {
532 transport.session_id = session_id
533 }
534 messages := parse_http_response_messages(response)!
535 if messages.len != 0 {
536 transport.pending << messages
537 return
538 }
539 if response.status_code >= 400 {
540 return error('mcp.http: server returned HTTP ${response.status_code} without an MCP payload')
541 }
542}
543
544fn (mut transport HttpTransport) receive() !string {
545 if transport.pending.len == 0 {
546 return error('mcp.http: no pending messages are available')
547 }
548 message := transport.pending[0]
549 transport.pending = if transport.pending.len == 1 {
550 []string{}
551 } else {
552 transport.pending[1..].clone()
553 }
554 return message
555}
556
557fn (mut transport HttpTransport) close() {
558 if transport.session_id == '' {
559 return
560 }
561 mut header := transport.header
562 header.set_custom(mcp_session_id_header, transport.session_id) or { return }
563 http.fetch(
564 method: .delete
565 url: transport.url
566 header: header
567 ) or {}
568 transport.session_id = ''
569}
570
571struct ProcessTransport {
572mut:
573 process &os.Process
574 buffer string
575}
576
577fn new_process_transport(command string, args []string) !ProcessTransport {
578 if command == '' {
579 return error('mcp.connect_stdio: empty command')
580 }
581 mut process := os.new_process(command)
582 process.set_args(args)
583 process.set_redirect_stdio()
584 process.run()
585 return ProcessTransport{
586 process: process
587 }
588}
589
590fn (mut transport ProcessTransport) send(message string) ! {
591 transport.process.stdin_write(encode_framed_message(message))
592}
593
594fn (mut transport ProcessTransport) receive() !string {
595 for {
596 frame := try_extract_framed_message(transport.buffer) or {
597 if err.msg() != NoFrameError{}.msg() {
598 return err
599 }
600 FrameExtraction{}
601 }
602 if frame.message.len != 0 {
603 transport.buffer = frame.remaining
604 return frame.message
605 }
606 if transport.process.is_pending(.stdout) {
607 chunk := transport.process.stdout_read()
608 if chunk.len != 0 {
609 transport.buffer += chunk
610 continue
611 }
612 }
613 if !transport.process.is_alive() {
614 transport.buffer += transport.process.stdout_slurp()
615 frame_after_exit := try_extract_framed_message(transport.buffer) or {
616 if err.msg() != NoFrameError{}.msg() {
617 return err
618 }
619 FrameExtraction{}
620 }
621 if frame_after_exit.message.len != 0 {
622 transport.buffer = frame_after_exit.remaining
623 return frame_after_exit.message
624 }
625 stderr_output := transport.process.stderr_slurp().trim_space()
626 if stderr_output.len != 0 {
627 return error('mcp.stdio: process exited before a full MCP message was received: ${stderr_output}')
628 }
629 return error('mcp.stdio: process exited before a full MCP message was received')
630 }
631 time.sleep(process_poll_interval)
632 }
633 return error('mcp.stdio: receive loop exited unexpectedly')
634}
635
636fn (mut transport ProcessTransport) close() {
637 if transport.process.is_alive() {
638 transport.process.signal_term()
639 for _ in 0 .. 20 {
640 if !transport.process.is_alive() {
641 break
642 }
643 time.sleep(10 * time.millisecond)
644 }
645 if transport.process.is_alive() {
646 transport.process.signal_kill()
647 } else if transport.process.status in [.running, .stopped] {
648 transport.process.wait()
649 }
650 }
651 transport.process.close()
652}
653
654fn parse_http_response_messages(response http.Response) ![]string {
655 content_type := response.header.get(.content_type) or { '' }
656 body := response.body.trim_space()
657 if body.len == 0 {
658 return []string{}
659 }
660 content_type_lower := content_type.to_lower()
661 if content_type_lower.starts_with('application/json')
662 || (content_type == '' && is_json_payload(body)) {
663 return split_json_payloads(body)
664 }
665 if content_type_lower.starts_with('text/event-stream') {
666 return parse_sse_messages(body)
667 }
668 return error('mcp.http: unsupported content type `${content_type}`')
669}
670
671fn split_json_payloads(body string) ![]string {
672 trimmed := body.trim_space()
673 if trimmed.len == 0 {
674 return []string{}
675 }
676 if trimmed[0] != `[` {
677 return [trimmed]
678 }
679 envelopes := json.decode([]MessageEnvelope, trimmed) or { return err }
680 mut messages := []string{cap: envelopes.len}
681 for envelope in envelopes {
682 messages << envelope.encode()
683 }
684 return messages
685}
686
687fn parse_sse_messages(body string) ![]string {
688 normalized := body.replace('\r\n', '\n').replace('\r', '\n')
689 mut data_lines := []string{}
690 mut messages := []string{}
691 for line in normalized.split('\n') {
692 if line.len == 0 {
693 if data_lines.len != 0 {
694 append_sse_payload(mut messages, data_lines.join('\n'))!
695 data_lines = []string{}
696 }
697 continue
698 }
699 if line.starts_with(':') {
700 continue
701 }
702 if line.starts_with('data:') {
703 mut payload := line[5..]
704 if payload.len != 0 && payload[0] == ` ` {
705 payload = payload[1..]
706 }
707 data_lines << payload
708 }
709 }
710 if data_lines.len != 0 {
711 append_sse_payload(mut messages, data_lines.join('\n'))!
712 }
713 return messages
714}
715
716fn append_sse_payload(mut messages []string, payload string) ! {
717 trimmed := payload.trim_space()
718 if !is_json_payload(trimmed) {
719 return
720 }
721 payloads := split_json_payloads(trimmed)!
722 for item in payloads {
723 messages << item
724 }
725}
726
727fn is_json_payload(payload string) bool {
728 trimmed := payload.trim_space()
729 if trimmed.len == 0 {
730 return false
731 }
732 return trimmed[0] == `{` || trimmed[0] == `[`
733}
734
735fn encode_framed_message(message string) string {
736 return '${content_length_header}: ${message.len}\r\n\r\n${message}'
737}
738
739fn try_extract_framed_message(buffer string) !FrameExtraction {
740 header_end := buffer.index('\r\n\r\n') or { return NoFrameError{} }
741 header_text := buffer[..header_end]
742 mut content_length := -1
743 for line in header_text.split('\r\n') {
744 if line.len == 0 {
745 continue
746 }
747 parts := line.split_nth(':', 2)
748 if parts.len != 2 {
749 continue
750 }
751 if parts[0].trim_space().to_lower() != content_length_header.to_lower() {
752 continue
753 }
754 length_text := parts[1].trim_space()
755 parsed_length := length_text.int()
756 if parsed_length == 0 && length_text != '0' {
757 return error('mcp.stdio: invalid Content-Length header `${length_text}`')
758 }
759 content_length = parsed_length
760 break
761 }
762 if content_length < 0 {
763 return error('mcp.stdio: missing Content-Length header')
764 }
765 body_start := header_end + 4
766 body_end := body_start + content_length
767 if buffer.len < body_end {
768 return NoFrameError{}
769 }
770 message := buffer[body_start..body_end]
771 remaining := if body_end >= buffer.len { '' } else { buffer[body_end..] }
772 return FrameExtraction{
773 message: message
774 remaining: remaining
775 }
776}
777
778fn encode_id[I](id I) string {
779 return $if I is string {
780 json.encode(id)
781 } $else $if I is int {
782 id.str()
783 } $else {
784 json.encode(id)
785 }
786}
787
788fn encode_value[T](value T) string {
789 return $if T is Empty {
790 value.str()
791 } $else $if T is EmptyObject {
792 value.str()
793 } $else $if T is Null {
794 value.str()
795 } $else $if T is string {
796 json.encode(value)
797 } $else {
798 json.encode(value)
799 }
800}
801
802fn decode_value[T](value string) !T {
803 $if T is Empty {
804 if value.len == 0 || value == null.str() {
805 return Empty{}
806 }
807 return error('mcp: expected an empty payload, got `${value}`')
808 } $else $if T is EmptyObject {
809 if value == '{}' {
810 return EmptyObject{}
811 }
812 return error('mcp: expected an empty object payload, got `${value}`')
813 } $else $if T is Null {
814 if value == null.str() {
815 return null
816 }
817 return error('mcp: expected null, got `${value}`')
818 } $else $if T is string {
819 if value.len >= 2 && value[0] == `"` && value[value.len - 1] == `"` {
820 return json.decode(string, value) or { return err }
821 }
822 return error('mcp: could not decode `${value}` into string')
823 } $else $if T is bool {
824 if value == 'true' {
825 return true
826 }
827 if value == 'false' {
828 return false
829 }
830 return error('mcp: could not decode `${value}` into bool')
831 } $else {
832 return json.decode(T, value) or { return err }
833 }
834}
835
836fn normalize_client_info(client_info Implementation) Implementation {
837 return if client_info.name == '' {
838 Implementation{
839 name: default_client_name
840 version: if client_info.version == '' {
841 default_client_version
842 } else {
843 client_info.version
844 }
845 }
846 } else if client_info.version == '' {
847 Implementation{
848 name: client_info.name
849 version: default_client_version
850 }
851 } else {
852 client_info
853 }
854}
855
856fn normalize_capabilities(capabilities string) string {
857 trimmed := capabilities.trim_space()
858 return if trimmed.len == 0 { '{}' } else { trimmed }
859}
860
861fn normalize_protocol_version(version string) string {
862 trimmed := version.trim_space()
863 return if trimmed.len == 0 { protocol_version } else { trimmed }
864}
865