blob: 0a766df1e3b45c83f5822744edd43a0577421cc6 [file] [log] [blame]
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{
io::{ErrorKind, Read, Write},
num::Wrapping,
os::unix::prelude::{AsRawFd, RawFd},
};
use log::{error, info};
use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
use vm_memory::{bitmap::BitmapSlice, Bytes, VolatileSlice};
use crate::{
rxops::*,
rxqueue::*,
txbuf::*,
vhu_vsock::{
Error, Result, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND,
VSOCK_OP_CREDIT_REQUEST, VSOCK_OP_CREDIT_UPDATE, VSOCK_OP_REQUEST, VSOCK_OP_RESPONSE,
VSOCK_OP_RST, VSOCK_OP_RW, VSOCK_OP_SHUTDOWN, VSOCK_TYPE_STREAM,
},
vhu_vsock_thread::VhostUserVsockThread,
};
#[derive(Debug)]
pub(crate) struct VsockConnection<S> {
/// Host-side stream corresponding to this vsock connection.
pub stream: S,
/// Specifies if the stream is connected to a listener on the host.
pub connect: bool,
/// Port at which a guest application is listening to.
pub peer_port: u32,
/// Queue holding pending rx operations per connection.
pub rx_queue: RxQueue,
/// CID of the host.
local_cid: u64,
/// Port on the host at which a host-side application listens to.
pub local_port: u32,
/// CID of the guest.
pub guest_cid: u64,
/// Total number of bytes written to stream from tx buffer.
pub fwd_cnt: Wrapping<u32>,
/// Total number of bytes previously forwarded to stream.
last_fwd_cnt: Wrapping<u32>,
/// Size of buffer the guest has allocated for this connection.
peer_buf_alloc: u32,
/// Number of bytes the peer has forwarded to a connection.
peer_fwd_cnt: Wrapping<u32>,
/// The total number of bytes sent to the guest vsock driver.
rx_cnt: Wrapping<u32>,
/// epoll fd to which this connection's stream has to be added.
pub epoll_fd: RawFd,
/// Local tx buffer.
pub tx_buf: LocalTxBuf,
/// Local tx buffer size
tx_buffer_size: u32,
}
impl<S: AsRawFd + Read + Write> VsockConnection<S> {
/// Create a new vsock connection object for locally i.e host-side
/// inititated connections.
pub fn new_local_init(
stream: S,
local_cid: u64,
local_port: u32,
guest_cid: u64,
guest_port: u32,
epoll_fd: RawFd,
tx_buffer_size: u32,
) -> Self {
Self {
stream,
connect: false,
peer_port: guest_port,
rx_queue: RxQueue::new(),
local_cid,
local_port,
guest_cid,
fwd_cnt: Wrapping(0),
last_fwd_cnt: Wrapping(0),
peer_buf_alloc: 0,
peer_fwd_cnt: Wrapping(0),
rx_cnt: Wrapping(0),
epoll_fd,
tx_buf: LocalTxBuf::new(tx_buffer_size),
tx_buffer_size,
}
}
/// Create a new vsock connection object for connections initiated by
/// an application running in the guest.
#[allow(clippy::too_many_arguments)]
pub fn new_peer_init(
stream: S,
local_cid: u64,
local_port: u32,
guest_cid: u64,
guest_port: u32,
epoll_fd: RawFd,
peer_buf_alloc: u32,
tx_buffer_size: u32,
) -> Self {
let mut rx_queue = RxQueue::new();
rx_queue.enqueue(RxOps::Response);
Self {
stream,
connect: false,
peer_port: guest_port,
rx_queue,
local_cid,
local_port,
guest_cid,
fwd_cnt: Wrapping(0),
last_fwd_cnt: Wrapping(0),
peer_buf_alloc,
peer_fwd_cnt: Wrapping(0),
rx_cnt: Wrapping(0),
epoll_fd,
tx_buf: LocalTxBuf::new(tx_buffer_size),
tx_buffer_size,
}
}
/// Set the peer port to the guest side application's port.
pub fn set_peer_port(&mut self, peer_port: u32) {
self.peer_port = peer_port;
}
/// Process a vsock packet that is meant for this connection.
/// Forward data to the host-side application if the vsock packet
/// contains a RW operation.
pub fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> {
// Initialize all fields in the packet header
self.init_pkt(pkt);
match self.rx_queue.dequeue() {
Some(RxOps::Request) => {
// Send a connection request to the guest-side application
pkt.set_op(VSOCK_OP_REQUEST);
Ok(())
}
Some(RxOps::Rw) => {
if !self.connect {
// There is no host-side application listening for this
// packet, hence send back an RST.
pkt.set_op(VSOCK_OP_RST);
return Ok(());
}
// Check if peer has space for receiving data
if self.need_credit_update_from_peer() {
self.last_fwd_cnt = self.fwd_cnt;
pkt.set_op(VSOCK_OP_CREDIT_REQUEST);
return Ok(());
}
let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?;
// Perform a credit check to find the maximum read size. The read
// data must fit inside a packet buffer and be within peer's
// available buffer space
let max_read_len = std::cmp::min(buf.len(), self.peer_avail_credit());
// Read data from the stream directly into the buffer
if let Ok(read_cnt) = buf.read_from(0, &mut self.stream, max_read_len) {
if read_cnt == 0 {
// If no data was read then the stream was closed down unexpectedly.
// Send a shutdown packet to the guest-side application.
pkt.set_op(VSOCK_OP_SHUTDOWN)
.set_flag(VSOCK_FLAGS_SHUTDOWN_RCV)
.set_flag(VSOCK_FLAGS_SHUTDOWN_SEND);
} else {
// If data was read, then set the length field in the packet header
// to the amount of data that was read.
pkt.set_op(VSOCK_OP_RW).set_len(read_cnt as u32);
// Re-register the stream file descriptor for read and write events
if VhostUserVsockThread::epoll_modify(
self.epoll_fd,
self.stream.as_raw_fd(),
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
)
.is_err()
{
if let Err(e) = VhostUserVsockThread::epoll_register(
self.epoll_fd,
self.stream.as_raw_fd(),
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
) {
// TODO: let's move this logic out of this func, and handle it properly
error!("epoll_register failed: {:?}, but proceed further.", e);
}
};
}
// Update the rx_cnt with the amount of data in the vsock packet.
self.rx_cnt += Wrapping(pkt.len());
self.last_fwd_cnt = self.fwd_cnt;
}
Ok(())
}
Some(RxOps::Response) => {
// A response has been received to a newly initiated host-side connection
self.connect = true;
pkt.set_op(VSOCK_OP_RESPONSE);
Ok(())
}
Some(RxOps::CreditUpdate) => {
// Request credit update from the guest.
if !self.rx_queue.pending_rx() {
// Waste an rx buffer if no rx is pending
pkt.set_op(VSOCK_OP_CREDIT_UPDATE);
self.last_fwd_cnt = self.fwd_cnt;
}
Ok(())
}
_ => Err(Error::NoRequestRx),
}
}
/// Deliver a guest generated packet to this connection.
///
/// Returns:
/// - always `Ok(())` to indicate that the packet has been consumed
pub fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> {
// Update peer credit information
self.peer_buf_alloc = pkt.buf_alloc();
self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt());
match pkt.op() {
VSOCK_OP_RESPONSE => {
// Confirmation for a host initiated connection
// TODO: Handle stream write error in a better manner
let response = format!("OK {}\n", self.peer_port);
self.stream.write_all(response.as_bytes()).unwrap();
self.connect = true;
}
VSOCK_OP_RW => {
// Data has to be written to the host-side stream
match pkt.data_slice() {
None => {
info!(
"Dropping empty packet from guest (lp={}, pp={})",
self.local_port, self.peer_port
);
return Ok(());
}
Some(buf) => {
if let Err(err) = self.send_bytes(buf) {
// TODO: Terminate this connection
dbg!("err:{:?}", err);
return Ok(());
}
}
}
}
VSOCK_OP_CREDIT_UPDATE => {
// Already updated the credit
// Re-register the stream file descriptor for read and write events
if VhostUserVsockThread::epoll_modify(
self.epoll_fd,
self.stream.as_raw_fd(),
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
)
.is_err()
{
if let Err(e) = VhostUserVsockThread::epoll_register(
self.epoll_fd,
self.stream.as_raw_fd(),
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
) {
// TODO: let's move this logic out of this func, and handle it properly
error!("epoll_register failed: {:?}, but proceed further.", e);
}
};
}
VSOCK_OP_CREDIT_REQUEST => {
// Send back this connection's credit information
self.rx_queue.enqueue(RxOps::CreditUpdate);
}
VSOCK_OP_SHUTDOWN => {
// Shutdown this connection
let recv_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_RCV != 0;
let send_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_SEND != 0;
if recv_off && send_off && self.tx_buf.is_empty() {
self.rx_queue.enqueue(RxOps::Reset);
}
}
_ => {}
}
Ok(())
}
/// Write data to the host-side stream.
///
/// Returns:
/// - Ok(cnt) where cnt is the number of bytes written to the stream
/// - Err(Error::UnixWrite) if there was an error writing to the stream
fn send_bytes<B: BitmapSlice>(&mut self, buf: &VolatileSlice<B>) -> Result<()> {
if !self.tx_buf.is_empty() {
// Data is already present in the buffer and the backend
// is waiting for a EPOLLOUT event to flush it
return self.tx_buf.push(buf);
}
// Write data to the stream
let written_count = match buf.write_to(0, &mut self.stream, buf.len()) {
Ok(cnt) => cnt,
Err(vm_memory::VolatileMemoryError::IOError(e)) => {
if e.kind() == ErrorKind::WouldBlock {
0
} else {
dbg!("send_bytes error: {:?}", e);
return Err(Error::UnixWrite);
}
}
Err(e) => {
dbg!("send_bytes error: {:?}", e);
return Err(Error::UnixWrite);
}
};
if written_count > 0 {
// Increment forwarded count by number of bytes written to the stream
self.fwd_cnt += Wrapping(written_count as u32);
// At what point in available credits should we send a credit update.
// This is set to 1/4th of the tx buffer size. If we keep it too low,
// we will end up sending too many credit updates. If we keep it too
// high, we will end up sending too few credit updates and cause stalls.
// Stalls are more bad than too many credit updates.
let free_space = self
.tx_buffer_size
.wrapping_sub((self.fwd_cnt - self.last_fwd_cnt).0);
if free_space < self.tx_buffer_size / 4 {
self.rx_queue.enqueue(RxOps::CreditUpdate);
}
}
if written_count != buf.len() {
return self.tx_buf.push(&buf.offset(written_count).unwrap());
}
Ok(())
}
/// Initialize all header fields in the vsock packet.
fn init_pkt<'a, 'b, B: BitmapSlice>(
&self,
pkt: &'a mut VsockPacket<'b, B>,
) -> &'a mut VsockPacket<'b, B> {
// Zero out the packet header
pkt.set_header_from_raw(&[0u8; PKT_HEADER_SIZE]).unwrap();
pkt.set_src_cid(self.local_cid)
.set_dst_cid(self.guest_cid)
.set_src_port(self.local_port)
.set_dst_port(self.peer_port)
.set_type(VSOCK_TYPE_STREAM)
.set_buf_alloc(self.tx_buffer_size)
.set_fwd_cnt(self.fwd_cnt.0)
}
/// Get max number of bytes we can send to peer without overflowing
/// the peer's buffer.
fn peer_avail_credit(&self) -> usize {
(Wrapping(self.peer_buf_alloc) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize
}
/// Check if we need a credit update from the peer before sending
/// more data to it.
fn need_credit_update_from_peer(&self) -> bool {
self.peer_avail_credit() == 0
}
}
#[cfg(test)]
mod tests {
use byteorder::{ByteOrder, LittleEndian};
use super::*;
use crate::vhu_vsock::{VSOCK_HOST_CID, VSOCK_OP_RW, VSOCK_TYPE_STREAM};
use std::io::Result as IoResult;
use std::ops::Deref;
use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE};
use virtio_queue::{mock::MockSplitQueue, Descriptor, DescriptorChain, Queue, QueueOwnedT};
use vm_memory::{
Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryLoadGuard,
GuestMemoryMmap,
};
const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
struct HeadParams {
head_len: usize,
data_len: u32,
}
impl HeadParams {
fn new(head_len: usize, data_len: u32) -> Self {
Self { head_len, data_len }
}
fn construct_head(&self) -> Vec<u8> {
let mut header = vec![0_u8; self.head_len];
if self.head_len == PKT_HEADER_SIZE {
// Offset into the header for data length
const HDROFF_LEN: usize = 24;
LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len);
}
header
}
}
fn prepare_desc_chain_vsock(
write_only: bool,
head_params: &HeadParams,
data_chain_len: u16,
head_data_len: u32,
) -> (
GuestMemoryAtomic<GuestMemoryMmap>,
DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>,
) {
let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap();
let virt_queue = MockSplitQueue::new(&mem, 16);
let mut next_addr = virt_queue.desc_table().total_size() + 0x100;
let mut flags = 0;
if write_only {
flags |= VRING_DESC_F_WRITE;
}
let mut head_flags = if data_chain_len > 0 {
flags | VRING_DESC_F_NEXT
} else {
flags
};
// vsock packet header
// let header = vec![0 as u8; head_params.head_len];
let header = head_params.construct_head();
let head_desc =
Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1);
mem.write(&header, head_desc.addr()).unwrap();
assert!(virt_queue.desc_table().store(0, head_desc).is_ok());
next_addr += head_params.head_len as u64;
// Put the descriptor index 0 in the first available ring position.
mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4))
.unwrap();
// Set `avail_idx` to 1.
mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2))
.unwrap();
// chain len excludes the head
for i in 0..(data_chain_len) {
// last descr in chain
if i == data_chain_len - 1 {
head_flags &= !VRING_DESC_F_NEXT;
}
// vsock data
let data = vec![0_u8; head_data_len as usize];
let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2);
mem.write(&data, data_desc.addr()).unwrap();
assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok());
next_addr += head_data_len as u64;
}
// Create descriptor chain from pre-filled memory
(
GuestMemoryAtomic::new(mem.clone()),
virt_queue
.create_queue::<Queue>()
.unwrap()
.iter(GuestMemoryAtomic::new(mem.clone()).memory())
.unwrap()
.next()
.unwrap(),
)
}
struct VsockDummySocket {
data: Vec<u8>,
}
impl VsockDummySocket {
fn new() -> Self {
Self { data: Vec::new() }
}
}
impl Write for VsockDummySocket {
fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, std::io::Error> {
self.data.clear();
self.data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> IoResult<()> {
Ok(())
}
}
impl Read for VsockDummySocket {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
buf[..self.data.len()].copy_from_slice(&self.data);
Ok(self.data.len())
}
}
impl AsRawFd for VsockDummySocket {
fn as_raw_fd(&self) -> RawFd {
-1
}
}
#[test]
fn test_vsock_conn_init() {
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let mut conn_local = VsockConnection::new_local_init(
dummy_file,
VSOCK_HOST_CID,
5000,
3,
5001,
-1,
CONN_TX_BUF_SIZE,
);
assert!(!conn_local.connect);
assert_eq!(conn_local.peer_port, 5001);
assert_eq!(conn_local.rx_queue, RxQueue::new());
assert_eq!(conn_local.local_cid, VSOCK_HOST_CID);
assert_eq!(conn_local.local_port, 5000);
assert_eq!(conn_local.guest_cid, 3);
// set peer port
conn_local.set_peer_port(5002);
assert_eq!(conn_local.peer_port, 5002);
// New connection initiated by the peer/guest
let dummy_file = VsockDummySocket::new();
let mut conn_peer = VsockConnection::new_peer_init(
dummy_file,
VSOCK_HOST_CID,
5000,
3,
5001,
-1,
65536,
CONN_TX_BUF_SIZE,
);
assert!(!conn_peer.connect);
assert_eq!(conn_peer.peer_port, 5001);
assert_eq!(conn_peer.rx_queue.dequeue().unwrap(), RxOps::Response);
assert!(!conn_peer.rx_queue.pending_rx());
assert_eq!(conn_peer.local_cid, VSOCK_HOST_CID);
assert_eq!(conn_peer.local_port, 5000);
assert_eq!(conn_peer.guest_cid, 3);
assert_eq!(conn_peer.peer_buf_alloc, 65536);
}
#[test]
fn test_vsock_conn_credit() {
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let mut conn_local = VsockConnection::new_local_init(
dummy_file,
VSOCK_HOST_CID,
5000,
3,
5001,
-1,
CONN_TX_BUF_SIZE,
);
assert_eq!(conn_local.peer_avail_credit(), 0);
assert!(conn_local.need_credit_update_from_peer());
conn_local.peer_buf_alloc = 65536;
assert_eq!(conn_local.peer_avail_credit(), 65536);
assert!(!conn_local.need_credit_update_from_peer());
conn_local.rx_cnt = Wrapping(32768);
assert_eq!(conn_local.peer_avail_credit(), 32768);
assert!(!conn_local.need_credit_update_from_peer());
conn_local.rx_cnt = Wrapping(65536);
assert_eq!(conn_local.peer_avail_credit(), 0);
assert!(conn_local.need_credit_update_from_peer());
}
#[test]
fn test_vsock_conn_init_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(PKT_HEADER_SIZE, 10);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let conn_local = VsockConnection::new_local_init(
dummy_file,
VSOCK_HOST_CID,
5000,
3,
5001,
-1,
CONN_TX_BUF_SIZE,
);
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10);
let mem = mem.memory();
let mut pkt =
VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// initialize a vsock packet for the guest
conn_local.init_pkt(&mut pkt);
assert_eq!(pkt.src_cid(), VSOCK_HOST_CID);
assert_eq!(pkt.dst_cid(), 3);
assert_eq!(pkt.src_port(), 5000);
assert_eq!(pkt.dst_port(), 5001);
assert_eq!(pkt.type_(), VSOCK_TYPE_STREAM);
assert_eq!(pkt.buf_alloc(), CONN_TX_BUF_SIZE);
assert_eq!(pkt.fwd_cnt(), 0);
}
#[test]
fn test_vsock_conn_recv_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(PKT_HEADER_SIZE, 5);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let mut conn_local = VsockConnection::new_local_init(
dummy_file,
VSOCK_HOST_CID,
5000,
3,
5001,
-1,
CONN_TX_BUF_SIZE,
);
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5);
let mem = mem.memory();
let mut pkt =
VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// VSOCK_OP_REQUEST: new local conn request
conn_local.rx_queue.enqueue(RxOps::Request);
let op_req = conn_local.recv_pkt(&mut pkt);
assert!(op_req.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.op(), VSOCK_OP_REQUEST);
// VSOCK_OP_RST: reset if connection not established
conn_local.rx_queue.enqueue(RxOps::Rw);
let op_rst = conn_local.recv_pkt(&mut pkt);
assert!(op_rst.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.op(), VSOCK_OP_RST);
// VSOCK_OP_CREDIT_UPDATE: need credit update from peer/guest
conn_local.connect = true;
conn_local.rx_queue.enqueue(RxOps::Rw);
conn_local.fwd_cnt = Wrapping(1024);
let op_credit_update = conn_local.recv_pkt(&mut pkt);
assert!(op_credit_update.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.op(), VSOCK_OP_CREDIT_REQUEST);
assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
// VSOCK_OP_SHUTDOWN: zero data read from stream/file
conn_local.peer_buf_alloc = 65536;
conn_local.rx_queue.enqueue(RxOps::Rw);
let op_zero_read_shutdown = conn_local.recv_pkt(&mut pkt);
assert!(op_zero_read_shutdown.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(conn_local.rx_cnt, Wrapping(0));
assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
assert_eq!(pkt.op(), VSOCK_OP_SHUTDOWN);
assert_eq!(
pkt.flags(),
VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND
);
// VSOCK_OP_RW: finite data read from stream/file
let payload = b"hello";
conn_local.stream.write_all(payload).unwrap();
conn_local.rx_queue.enqueue(RxOps::Rw);
let op_zero_read = conn_local.recv_pkt(&mut pkt);
assert!(op_zero_read.is_ok());
assert_eq!(pkt.op(), VSOCK_OP_RW);
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(conn_local.rx_cnt, Wrapping(payload.len() as u32));
assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
assert_eq!(pkt.len(), 5);
let buf = &mut [0u8; 5];
assert!(pkt.data_slice().unwrap().read_slice(buf, 0).is_ok());
assert_eq!(buf, b"hello");
// VSOCK_OP_RESPONSE: response from a locally initiated connection
conn_local.rx_queue.enqueue(RxOps::Response);
let op_response = conn_local.recv_pkt(&mut pkt);
assert!(op_response.is_ok());
assert!(!conn_local.rx_queue.pending_rx());
assert_eq!(pkt.op(), VSOCK_OP_RESPONSE);
assert!(conn_local.connect);
// VSOCK_OP_CREDIT_UPDATE: guest needs credit update
conn_local.rx_queue.enqueue(RxOps::CreditUpdate);
let op_credit_update = conn_local.recv_pkt(&mut pkt);
assert!(!conn_local.rx_queue.pending_rx());
assert!(op_credit_update.is_ok());
assert_eq!(pkt.op(), VSOCK_OP_CREDIT_UPDATE);
assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024));
// non-existent request
let op_error = conn_local.recv_pkt(&mut pkt);
assert!(op_error.is_err());
}
#[test]
fn test_vsock_conn_send_pkt() {
// parameters for packet head construction
let head_params = HeadParams::new(PKT_HEADER_SIZE, 5);
// new locally inititated connection
let dummy_file = VsockDummySocket::new();
let mut conn_local = VsockConnection::new_local_init(
dummy_file,
VSOCK_HOST_CID,
5000,
3,
5001,
-1,
CONN_TX_BUF_SIZE,
);
// write only descriptor chain
let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5);
let mem = mem.memory();
let mut pkt =
VsockPacket::from_tx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE)
.unwrap();
// peer credit information
pkt.set_buf_alloc(65536).set_fwd_cnt(1024);
// check if peer credit information is updated currently
let credit_check = conn_local.send_pkt(&pkt);
assert!(credit_check.is_ok());
assert_eq!(conn_local.peer_buf_alloc, 65536);
assert_eq!(conn_local.peer_fwd_cnt, Wrapping(1024));
// VSOCK_OP_RESPONSE
pkt.set_op(VSOCK_OP_RESPONSE);
let peer_response = conn_local.send_pkt(&pkt);
assert!(peer_response.is_ok());
assert!(conn_local.connect);
let mut resp_buf = vec![0; 8];
conn_local.stream.read_exact(&mut resp_buf).unwrap();
assert_eq!(resp_buf, b"OK 5001\n");
// VSOCK_OP_RW
pkt.set_op(VSOCK_OP_RW);
let buf = b"hello";
assert!(pkt.data_slice().unwrap().write_slice(buf, 0).is_ok());
let rw_response = conn_local.send_pkt(&pkt);
assert!(rw_response.is_ok());
let mut resp_buf = vec![0; 5];
conn_local.stream.read_exact(&mut resp_buf).unwrap();
assert_eq!(resp_buf, b"hello");
// VSOCK_OP_CREDIT_REQUEST
pkt.set_op(VSOCK_OP_CREDIT_REQUEST);
let credit_response = conn_local.send_pkt(&pkt);
assert!(credit_response.is_ok());
assert_eq!(conn_local.rx_queue.peek().unwrap(), RxOps::CreditUpdate);
// VSOCK_OP_SHUTDOWN
pkt.set_op(VSOCK_OP_SHUTDOWN);
pkt.set_flags(VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND);
let shutdown_response = conn_local.send_pkt(&pkt);
assert!(shutdown_response.is_ok());
assert!(conn_local.rx_queue.contains(RxOps::Reset.bitmask()));
}
}