Towards Fast IO on Linux using Rust

We will be trying to compare various different ways of reading a file using Rust. Apart from "wc -l" we will be running each function 10 times using criterion and then picking the mean.

Code for the following benchmarks lives at Benchmark code for Linux IO using Rust. In the following code BUFFER_SIZE was 8192 and NUM_BUFFERS was 32.

Details about the machine

  1. Framework 16 with 7840hs and 64 Gigs of RAM. Power plugged in and performance mode enabled.
  2. SSD: WD_BLACK SN850X 4000GB. Test using Gnome Disks shows the read speed at 3.6 GB/s (Sample size 1000MB, 100 Samples).
  3. Filesystem : btrfs
  4. Uname string: Linux fedora 6.8.8-300.fc40.x86_64 #1 SMP PREEMPT_DYNAMIC Sat Apr 27 17:53:31 UTC 2024 x86_64 GNU/Linux

Details about the text file

Uncompressed size: 22G Number of lines: 200,000,000 Compressed size after btrfs compression (zstd): 5.3G

For the impatient: an overview of the results

Method Time (seconds)
Mmap with AVX512 2.61
Mmap with AVX2 2.64
io_uring with Vectored IO 2.86
Vectored IO 2.89
Mmap 3.43
io_uring 5.26
wc -l (baseline) 8.01
Direct IO 10.56
BufReader without appends 15.94
BufReader with lines().count() 33.50
Benchmark results

Interesting observation was that AVX512 was taking 2.61 seconds, file is ~22G and SSD benchmarks show 3.6 GB/s read speed. This means that the file should be read in about 6 seconds. The AVX512 implementation is reading the file at about 8.4 GB/s. What gives? Turns out Fedora uses btrfs which enables zstd compression by default. Actual on disk size can be found using compsize.

opdroid@box:~/tmp$ sudo compsize data 
Processed 1 file, 177437 regular extents (177437 refs), 0 inline.
Type       Perc     Disk Usage   Uncompressed Referenced  
TOTAL       24%      5.3G          21G          21G       
none       100%       32K          32K          32K       
zstd        24%      5.3G          21G          21G

Thanks to these fine folks

  1. @alextjensen - for pointing me to sane defaults for BufReader and to compile to the native arch.
  2. @aepau2 - for spotting a glaring error in the wc numbers. I had forgotten to drop the cache before measuring with wc.
  3. @rflaherty71 - for pointing me to use more buffers which are larger (64 x 64k).
  4. @daniel_c0deb0t - for pointing me to use larger buffers.

Always a good idea to use some code we did not write as a baseline.

Baseline: wc -l

  opdroid@box:~/tmp$ time wc -l data
  200000000 data

  real	0m8.010s
  user	0m0.193s
  sys	0m7.591s

We reset the file caches using the following command at the end of each function. I am yet to figure out how to use a teardown function in criterion so that this doesnt get counted in the time taken.

// TODO: move to a teardown function in criterion
fn reset_file_caches() {
    // Execute the command to reset file caches
    let output = Command::new("sudo")
        .arg("sh")
        .arg("-c")
        .arg("echo 3 > /proc/sys/vm/drop_caches")
        .output()
        .expect("Failed to reset file caches");

    // Check if the command executed successfully
    if !output.status.success() {
        panic!("Failed to reset file caches: {:?}", output);
    }
}

Method 1: Read the file using BufReader and use reader.lines().count()

fn count_newlines_standard(filename: &str) -> Result<usize, std::io::Error> {
    let file = File::open(filename)?;
    let reader = BufReader::with_capacity(16 * 1024, file);

    let newline_count = reader.lines().count();

    reset_file_caches();
    Ok(newline_count)
}

This takes about 36.5 seconds on my machine.

/images/flamegraph_count_newlines_standard.png

Method 2: Read the file using BufReader and avoid string appends

pub fn count_newlines_standard_non_appending(filename: &str) -> Result<usize, std::io::Error> {
    let file = File::open(filename)?;
    let mut reader = BufReader::with_capacity(64 * 1024, file);
    let mut newline_count = 0;

    loop {
        let len = {
            let buffer = reader.fill_buf()?;
            if buffer.is_empty() {
                break;
            }
            newline_count += buffer.iter().filter(|&&b| b == b'\n').count();
            buffer.len()
        };

        reader.consume(len);
    }

    reset_file_caches();
    Ok(newline_count)
}

This takes about 15.94 seconds on my machine. This is less than half of the appending version.

When we look at the flamegraph we can verify that the appends are gone.

/images/flamegraph_count_newlines_standard_non_appending.svg

Method 3: Read the file using Direct IO

fn count_newlines_direct_io(filename: &str) -> Result<usize, Error> {
    let mut open_options = File::options();
    open_options.read(true).custom_flags(libc::O_DIRECT);

    let mut file = open_options.open(filename)?;
    let mut buffer = vec![0; BUFFER_SIZE];
    let mut newline_count = 0;

    loop {
        let bytes_read = file.read(&mut buffer)?;
        if bytes_read == 0 {
            break;
        }

        let chunk_newline_count = buffer[..bytes_read].iter().filter(|&&b| b == b'\n').count();
        newline_count += chunk_newline_count;
    }
    reset_file_caches();
    Ok(newline_count)
}

This takes about 35.7 seconds on my machine.

Method 4: Read the file using Mmap

fn count_newlines_memmap(filename: &str) -> Result<usize, Error> {
    let file = File::open(filename)?;
    let mmap = unsafe { Mmap::map(&file)? };

    let newline_count = mmap.iter().filter(|&&b| b == b'\n').count();
    reset_file_caches();
    Ok(newline_count)
}

This takes about 8.3 seconds on my machine.

Method 5: Read the file using Mmap and AVX2

unsafe fn count_newlines_memmap_avx2(filename: &str) -> Result<usize, Error> {
    let file = File::open(filename)?;
    let mmap = unsafe { Mmap::map(&file)? };

    let newline_byte = b'\n';
    let newline_vector = _mm256_set1_epi8(newline_byte as i8);
    let mut newline_count = 0;

    let mut ptr = mmap.as_ptr();
    let end_ptr = unsafe { ptr.add(mmap.len()) };

    while ptr <= end_ptr.sub(32) {
        let data = unsafe { _mm256_loadu_si256(ptr as *const __m256i) };
        let cmp_result = _mm256_cmpeq_epi8(data, newline_vector);
        let mask = _mm256_movemask_epi8(cmp_result);
        newline_count += mask.count_ones() as usize;
        ptr = unsafe { ptr.add(32) };
    }

    // Count remaining bytes
    let remaining_bytes = end_ptr as usize - ptr as usize;
    newline_count += mmap[mmap.len() - remaining_bytes..].iter().filter(|&&b| b == newline_byte).count();

    reset_file_caches();
    Ok(newline_count)
}

This takes about 2.64 seconds on my machine.

Method 6: Read the file using Mmap and AVX512

unsafe fn count_newlines_memmap_avx512(filename: &str) -> Result<usize, Error> {
    let file = File::open(filename)?;
    let mmap = unsafe { Mmap::map(&file)? };

    let newline_byte = b'\n';
    let newline_vector = _mm512_set1_epi8(newline_byte as i8);
    let mut newline_count = 0;

    let mut ptr = mmap.as_ptr();
    let end_ptr = unsafe { ptr.add(mmap.len()) };

    while ptr <= end_ptr.sub(64) {
        let data = unsafe { _mm512_loadu_si512(ptr as *const i32) };
        let cmp_result = _mm512_cmpeq_epi8_mask(data, newline_vector);
        newline_count += cmp_result.count_ones() as usize;
        ptr = unsafe { ptr.add(64) };
    }

    // Count remaining bytes
    let remaining_bytes = end_ptr as usize - ptr as usize;
    newline_count += mmap[mmap.len() - remaining_bytes..].iter().filter(|&&b| b == newline_byte).count();

    reset_file_caches();
    Ok(newline_count)
}

This takes about 2.61 seconds on my machine.

Method 7: Read the file using Vectored IO

fn count_newlines_vectored_io(path: &str) -> Result<usize, Error>  {
    let mut file = File::open(path)?;

    let mut buffers_: Vec<_> = (0..16).map(|_| vec![0; BUFFER_SIZE]).collect();
    let mut buffers: Vec<_> = buffers_.iter_mut().map(|buf| io::IoSliceMut::new(buf)).collect();

    let mut newline_count = 0;

    loop {
        let bytes_read = file.read_vectored(&mut buffers)?;
        if bytes_read == 0 {
            break;
        }

        // Calculate how many buffers were filled
        let filled_buffers = bytes_read / BUFFER_SIZE;

        // Process the fully filled buffers
        for buf in &buffers[..filled_buffers] {
            newline_count += buf.iter().filter(|&&b| b == b'\n').count();
        }

        // Handle the potentially partially filled last buffer
        if filled_buffers < buffers.len() {
            let last_buffer = &buffers[filled_buffers];
            let end = bytes_read % BUFFER_SIZE;
            newline_count += last_buffer[..end].iter().filter(|&&b| b == b'\n').count();
        }
    }
    Ok(newline_count)
}

This takes about 7.7 seconds on my machine.

Method 8: Read the file using io_uring

fn count_lines_io_uring(path: &str) -> io::Result<usize> {
    let file = File::open(path)?;
    let fd = file.as_raw_fd();

    let mut ring = IoUring::new(8)?;
    let mut line_count = 0;
    let mut offset = 0;

    let mut buf = vec![0; 4096];
    let mut read_size = buf.len();

    loop {
        let mut sqe = opcode::Read::new(types::Fd(fd), buf.as_mut_ptr(), read_size as _)
            .offset(offset as _)
            .build()
            .user_data(line_count as _);

        unsafe {
            ring.submission()
                .push(&mut sqe)
                .expect("submission queue is full");
        }

        ring.submit_and_wait(1)?;

        let cqe = ring.completion().next().expect("completion queue is empty");

        let bytes_read = cqe.result() as usize;
        line_count = cqe.user_data() as usize;

        if bytes_read == 0 {
            break;
        }

        let data = &buf[..bytes_read];
        line_count += data.iter().filter(|&&b| b == b'\n').count();

        offset += bytes_read as u64;
        read_size = (buf.len() - (offset as usize % buf.len())) as usize;
    }
    Ok(line_count)
}

This takes about 10.5 seconds on my machine.

Method 9: Read the file using io_uring with vectored IO

fn count_lines_io_uring_vectored(path: &str) -> io::Result<usize> {
    let file = File::open(path)?;
    let fd = file.as_raw_fd();

    let mut ring = IoUring::new(NUM_BUFFERS as u32)?;
    let mut line_count = 0;
    let mut offset = 0;

    let mut buffers = vec![vec![0; 8192]; NUM_BUFFERS];
    let mut iovecs: Vec<iovec> = buffers
        .iter_mut()
        .map(|buf| iovec {
            iov_base: buf.as_mut_ptr() as *mut _,
            iov_len: buf.len(),
        })
        .collect();

    loop {
        let mut sqe = opcode::Readv::new(types::Fd(fd), iovecs.as_mut_ptr(), iovecs.len() as _)
            .offset(offset as _)
            .build()
            .user_data(0);

        unsafe {
            ring.submission()
                .push(&mut sqe)
                .expect("submission queue is full");
        }

        ring.submit_and_wait(1)?;

        let cqe = ring.completion().next().expect("completion queue is empty");
        let bytes_read = cqe.result() as usize;

        if bytes_read == 0 {
            break;
        }

        let mut buffer_line_count = 0;
        let mut remaining_bytes = bytes_read;
        for buf in &buffers[..iovecs.len()] {
            let buf_size = buf.len();
            let data_size = remaining_bytes.min(buf_size);
            let data = &buf[..data_size];
            buffer_line_count += data.iter().filter(|&&b| b == b'\n').count();
            remaining_bytes -= data_size;
            if remaining_bytes == 0 {
                break;
            }
        }
        line_count += buffer_line_count;

        offset += bytes_read as u64;
    }

    Ok(line_count)
}

This takes about 7.6 seconds on my machine.