v2 / vlib / x / async / tests / net_http / net_http_integration_test.v
73 lines · 67 sloc · 1.75 KB · 15fb60b77ea6073658aa8355b247f2e1ae03b714
Raw
1import context
2import net.http
3import time
4import x.async as xasync
5
6fn test_net_http_pool_handles_synthetic_requests_with_backpressure() {
7 mut pool := xasync.new_pool(workers: 1, queue_size: 1)!
8 started := chan bool{cap: 1}
9 release := chan bool{cap: 2}
10 responses := chan int{cap: 2}
11 first_req := http.new_request(.get, '/first', '')
12 second_req := http.new_request(.get, '/second', '')
13 third_req := http.new_request(.get, '/third', '')
14
15 pool.try_submit(fn [first_req, started, release, responses] (mut ctx context.Context) ! {
16 _ = ctx
17 started <- true
18 _ := <-release
19 resp := synthetic_http_response(first_req)!
20 responses <- resp.status_code
21 })!
22 wait_for_http_signal(started, 'first synthetic HTTP job did not start')
23
24 pool.try_submit(fn [second_req, release, responses] (mut ctx context.Context) ! {
25 _ = ctx
26 _ := <-release
27 resp := synthetic_http_response(second_req)!
28 responses <- resp.status_code
29 })!
30 pool.try_submit(fn [third_req] (mut ctx context.Context) ! {
31 _ = ctx
32 _ = third_req
33 }) or {
34 assert err.msg() == 'async: pool queue is full'
35 release <- true
36 release <- true
37 pool.close()!
38 assert read_http_status(responses) == 200
39 assert read_http_status(responses) == 200
40 return
41 }
42 assert false
43}
44
45fn synthetic_http_response(req http.Request) !http.Response {
46 if req.url == '' {
47 return error('empty synthetic HTTP URL')
48 }
49 return http.new_response(status: .ok, body: req.url)
50}
51
52fn wait_for_http_signal(signal chan bool, message string) {
53 select {
54 ok := <-signal {
55 assert ok
56 }
57 1 * time.second {
58 assert false, message
59 }
60 }
61}
62
63fn read_http_status(responses chan int) int {
64 select {
65 status := <-responses {
66 return status
67 }
68 1 * time.second {
69 assert false, 'synthetic HTTP response was not produced'
70 }
71 }
72 return 0
73}
74