block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use async_trait::async_trait;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
8use fidl_fuchsia_hardware_block_partition::PartitionProxy;
9use fidl_fuchsia_hardware_block_volume::VolumeProxy;
10use fuchsia_sync::Mutex;
11use futures::channel::oneshot;
12use futures::executor::block_on;
13use lazy_static::lazy_static;
14use std::collections::HashMap;
15use std::future::Future;
16use std::hash::{Hash, Hasher};
17use std::mem::MaybeUninit;
18use std::num::NonZero;
19use std::ops::{DerefMut, Range};
20use std::pin::Pin;
21use std::sync::atomic::{AtomicU16, Ordering};
22use std::sync::Arc;
23use std::task::{Context, Poll, Waker};
24use zx::sys::zx_handle_t;
25use zx::{self as zx, HandleBased as _};
26use {
27    fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
28    fuchsia_async as fasync, storage_trace as trace,
29};
30
31pub use cache::Cache;
32
33pub use block::Flag as BlockFlags;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41/// If a trace flow ID isn't specified for requests, one will be generated.
42pub const NO_TRACE_ID: u64 = 0;
43
44pub use block_driver::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47    match error {
48        fidl::Error::ClientChannelClosed { status, .. } => status,
49        _ => zx::Status::INTERNAL,
50    }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54    match BlockOpcode::from_primitive(opcode) {
55        Some(BlockOpcode::Read) => "read",
56        Some(BlockOpcode::Write) => "write",
57        Some(BlockOpcode::Flush) => "flush",
58        Some(BlockOpcode::Trim) => "trim",
59        Some(BlockOpcode::CloseVmo) => "close_vmo",
60        None => "unknown",
61    }
62}
63
64// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
65// reused within this process).
66fn generate_trace_flow_id(request_id: u32) -> u64 {
67    lazy_static! {
68        static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
69    };
70    *SELF_HANDLE as u64 + (request_id as u64) << 32
71}
72
73pub enum BufferSlice<'a> {
74    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
75    Memory(&'a [u8]),
76}
77
78impl<'a> BufferSlice<'a> {
79    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
80        BufferSlice::VmoId { vmo_id, offset, length }
81    }
82}
83
84impl<'a> From<&'a [u8]> for BufferSlice<'a> {
85    fn from(buf: &'a [u8]) -> Self {
86        BufferSlice::Memory(buf)
87    }
88}
89
90pub enum MutableBufferSlice<'a> {
91    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
92    Memory(&'a mut [u8]),
93}
94
95impl<'a> MutableBufferSlice<'a> {
96    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
97        MutableBufferSlice::VmoId { vmo_id, offset, length }
98    }
99}
100
101impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
102    fn from(buf: &'a mut [u8]) -> Self {
103        MutableBufferSlice::Memory(buf)
104    }
105}
106
107#[derive(Default)]
108struct RequestState {
109    result: Option<zx::Status>,
110    waker: Option<Waker>,
111}
112
113#[derive(Default)]
114struct FifoState {
115    // The fifo.
116    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
117
118    // The next request ID to be used.
119    next_request_id: u32,
120
121    // A queue of messages to be sent on the fifo.
122    queue: std::collections::VecDeque<BlockFifoRequest>,
123
124    // Map from request ID to RequestState.
125    map: HashMap<u32, RequestState>,
126
127    // The waker for the FifoPoller.
128    poller_waker: Option<Waker>,
129
130    // If set, attach a barrier to the next write request
131    attach_barrier: bool,
132}
133
134impl FifoState {
135    fn terminate(&mut self) {
136        self.fifo.take();
137        for (_, request_state) in self.map.iter_mut() {
138            request_state.result.get_or_insert(zx::Status::CANCELED);
139            if let Some(waker) = request_state.waker.take() {
140                waker.wake();
141            }
142        }
143        if let Some(waker) = self.poller_waker.take() {
144            waker.wake();
145        }
146    }
147
148    // Returns true if polling should be terminated.
149    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
150        let fifo = if let Some(fifo) = self.fifo.as_ref() {
151            fifo
152        } else {
153            return true;
154        };
155
156        loop {
157            let slice = self.queue.as_slices().0;
158            if slice.is_empty() {
159                return false;
160            }
161            match fifo.try_write(context, slice) {
162                Poll::Ready(Ok(sent)) => {
163                    self.queue.drain(0..sent);
164                }
165                Poll::Ready(Err(_)) => {
166                    self.terminate();
167                    return true;
168                }
169                Poll::Pending => {
170                    return false;
171                }
172            }
173        }
174    }
175}
176
177type FifoStateRef = Arc<Mutex<FifoState>>;
178
179// A future used for fifo responses.
180struct ResponseFuture {
181    request_id: u32,
182    fifo_state: FifoStateRef,
183}
184
185impl ResponseFuture {
186    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
187        ResponseFuture { request_id, fifo_state }
188    }
189}
190
191impl Future for ResponseFuture {
192    type Output = Result<(), zx::Status>;
193
194    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
195        let mut state = self.fifo_state.lock();
196        let request_state = state.map.get_mut(&self.request_id).unwrap();
197        if let Some(result) = request_state.result {
198            Poll::Ready(result.into())
199        } else {
200            request_state.waker.replace(context.waker().clone());
201            Poll::Pending
202        }
203    }
204}
205
206impl Drop for ResponseFuture {
207    fn drop(&mut self) {
208        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
209    }
210}
211
212/// Wraps a vmo-id. Will panic if you forget to detach.
213#[derive(Debug)]
214#[must_use]
215pub struct VmoId(AtomicU16);
216
217impl VmoId {
218    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
219    pub fn new(id: u16) -> Self {
220        Self(AtomicU16::new(id))
221    }
222
223    /// Invalidates self and returns a new VmoId with the same underlying ID.
224    pub fn take(&self) -> Self {
225        Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
226    }
227
228    pub fn is_valid(&self) -> bool {
229        self.id() != block_driver::BLOCK_VMOID_INVALID
230    }
231
232    /// Takes the ID.  The caller assumes responsibility for detaching.
233    #[must_use]
234    pub fn into_id(self) -> u16 {
235        self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
236    }
237
238    pub fn id(&self) -> u16 {
239        self.0.load(Ordering::Relaxed)
240    }
241}
242
243impl PartialEq for VmoId {
244    fn eq(&self, other: &Self) -> bool {
245        self.id() == other.id()
246    }
247}
248
249impl Eq for VmoId {}
250
251impl Drop for VmoId {
252    fn drop(&mut self) {
253        assert_eq!(
254            self.0.load(Ordering::Relaxed),
255            block_driver::BLOCK_VMOID_INVALID,
256            "Did you forget to detach?"
257        );
258    }
259}
260
261impl Hash for VmoId {
262    fn hash<H: Hasher>(&self, state: &mut H) {
263        self.id().hash(state);
264    }
265}
266
267/// Represents a client connection to a block device. This is a simplified version of the block.fidl
268/// interface.
269/// Most users will use the RemoteBlockClient instantiation of this trait.
270#[async_trait]
271pub trait BlockClient: Send + Sync {
272    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
273    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
274
275    /// Detaches the given vmo-id from the device.
276    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
277
278    /// Reads from the device at |device_offset| into the given buffer slice.
279    async fn read_at(
280        &self,
281        buffer_slice: MutableBufferSlice<'_>,
282        device_offset: u64,
283    ) -> Result<(), zx::Status> {
284        self.read_at_traced(buffer_slice, device_offset, 0).await
285    }
286
287    async fn read_at_traced(
288        &self,
289        buffer_slice: MutableBufferSlice<'_>,
290        device_offset: u64,
291        trace_flow_id: u64,
292    ) -> Result<(), zx::Status>;
293
294    /// Writes the data in |buffer_slice| to the device.
295    async fn write_at(
296        &self,
297        buffer_slice: BufferSlice<'_>,
298        device_offset: u64,
299    ) -> Result<(), zx::Status> {
300        self.write_at_with_opts_traced(
301            buffer_slice,
302            device_offset,
303            WriteOptions::empty(),
304            NO_TRACE_ID,
305        )
306        .await
307    }
308
309    async fn write_at_with_opts(
310        &self,
311        buffer_slice: BufferSlice<'_>,
312        device_offset: u64,
313        opts: WriteOptions,
314    ) -> Result<(), zx::Status> {
315        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
316    }
317
318    async fn write_at_with_opts_traced(
319        &self,
320        buffer_slice: BufferSlice<'_>,
321        device_offset: u64,
322        opts: WriteOptions,
323        trace_flow_id: u64,
324    ) -> Result<(), zx::Status>;
325
326    /// Trims the given range on the block device.
327    async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
328        self.trim_traced(device_range, NO_TRACE_ID).await
329    }
330
331    async fn trim_traced(
332        &self,
333        device_range: Range<u64>,
334        trace_flow_id: u64,
335    ) -> Result<(), zx::Status>;
336
337    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
338    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
339    /// This method makes it easier to guarantee that the barrier is attached to the correct
340    /// write operation when subsequent write operations can get reordered.
341    fn barrier(&self);
342
343    async fn flush(&self) -> Result<(), zx::Status> {
344        self.flush_traced(NO_TRACE_ID).await
345    }
346
347    /// Sends a flush request to the underlying block device.
348    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
349
350    /// Closes the fifo.
351    async fn close(&self) -> Result<(), zx::Status>;
352
353    /// Returns the block size of the device.
354    fn block_size(&self) -> u32;
355
356    /// Returns the size, in blocks, of the device.
357    fn block_count(&self) -> u64;
358
359    /// Returns the maximum number of blocks which can be transferred in a single request.
360    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
361
362    /// Returns the block flags reported by the device.
363    fn block_flags(&self) -> BlockFlags;
364
365    /// Returns true if the remote fifo is still connected.
366    fn is_connected(&self) -> bool;
367}
368
369struct Common {
370    block_size: u32,
371    block_count: u64,
372    max_transfer_blocks: Option<NonZero<u32>>,
373    block_flags: BlockFlags,
374    fifo_state: FifoStateRef,
375    temp_vmo: futures::lock::Mutex<zx::Vmo>,
376    temp_vmo_id: VmoId,
377}
378
379impl Common {
380    fn new(
381        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
382        info: &block::BlockInfo,
383        temp_vmo: zx::Vmo,
384        temp_vmo_id: VmoId,
385    ) -> Self {
386        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
387        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
388        Self {
389            block_size: info.block_size,
390            block_count: info.block_count,
391            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
392                NonZero::new(info.max_transfer_size / info.block_size)
393            } else {
394                None
395            },
396            block_flags: info.flags,
397            fifo_state,
398            temp_vmo: futures::lock::Mutex::new(temp_vmo),
399            temp_vmo_id,
400        }
401    }
402
403    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
404        if bytes % self.block_size as u64 != 0 {
405            Err(zx::Status::INVALID_ARGS)
406        } else {
407            Ok(bytes / self.block_size as u64)
408        }
409    }
410
411    // Sends the request and waits for the response.
412    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
413        async move {
414            let (request_id, trace_flow_id) = {
415                let mut state = self.fifo_state.lock();
416
417                let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
418                if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
419                    && state.attach_barrier
420                {
421                    flags |= BlockIoFlag::PRE_BARRIER;
422                    request.command.flags = flags.bits();
423                    state.attach_barrier = false;
424                }
425
426                if state.fifo.is_none() {
427                    // Fifo has been closed.
428                    return Err(zx::Status::CANCELED);
429                }
430                trace::duration!(
431                    c"storage",
432                    c"block_client::send::start",
433                    "op" => opcode_str(request.command.opcode)
434                );
435                let request_id = state.next_request_id;
436                state.next_request_id = state.next_request_id.overflowing_add(1).0;
437                assert!(
438                    state.map.insert(request_id, RequestState::default()).is_none(),
439                    "request id in use!"
440                );
441                request.reqid = request_id;
442                if request.trace_flow_id == NO_TRACE_ID {
443                    request.trace_flow_id = generate_trace_flow_id(request_id);
444                }
445                let trace_flow_id = request.trace_flow_id;
446                trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
447                state.queue.push_back(request);
448                if let Some(waker) = state.poller_waker.clone() {
449                    state.poll_send_requests(&mut Context::from_waker(&waker));
450                }
451                (request_id, trace_flow_id)
452            };
453            ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
454            trace::duration!(c"storage", c"block_client::send::end");
455            trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
456            Ok(())
457        }
458        .await
459    }
460
461    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
462        self.send(BlockFifoRequest {
463            command: BlockFifoCommand {
464                opcode: BlockOpcode::CloseVmo.into_primitive(),
465                flags: 0,
466                ..Default::default()
467            },
468            vmoid: vmo_id.into_id(),
469            ..Default::default()
470        })
471        .await
472    }
473
474    async fn read_at(
475        &self,
476        buffer_slice: MutableBufferSlice<'_>,
477        device_offset: u64,
478        trace_flow_id: u64,
479    ) -> Result<(), zx::Status> {
480        match buffer_slice {
481            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
482                self.send(BlockFifoRequest {
483                    command: BlockFifoCommand {
484                        opcode: BlockOpcode::Read.into_primitive(),
485                        flags: 0,
486                        ..Default::default()
487                    },
488                    vmoid: vmo_id.id(),
489                    length: self
490                        .to_blocks(length)?
491                        .try_into()
492                        .map_err(|_| zx::Status::INVALID_ARGS)?,
493                    vmo_offset: self.to_blocks(offset)?,
494                    dev_offset: self.to_blocks(device_offset)?,
495                    trace_flow_id,
496                    ..Default::default()
497                })
498                .await?
499            }
500            MutableBufferSlice::Memory(mut slice) => {
501                let temp_vmo = self.temp_vmo.lock().await;
502                let mut device_block = self.to_blocks(device_offset)?;
503                loop {
504                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
505                    let block_count = self.to_blocks(to_do as u64)? as u32;
506                    self.send(BlockFifoRequest {
507                        command: BlockFifoCommand {
508                            opcode: BlockOpcode::Read.into_primitive(),
509                            flags: 0,
510                            ..Default::default()
511                        },
512                        vmoid: self.temp_vmo_id.id(),
513                        length: block_count,
514                        vmo_offset: 0,
515                        dev_offset: device_block,
516                        trace_flow_id,
517                        ..Default::default()
518                    })
519                    .await?;
520                    temp_vmo.read(&mut slice[..to_do], 0)?;
521                    if to_do == slice.len() {
522                        break;
523                    }
524                    device_block += block_count as u64;
525                    slice = &mut slice[to_do..];
526                }
527            }
528        }
529        Ok(())
530    }
531
532    async fn write_at(
533        &self,
534        buffer_slice: BufferSlice<'_>,
535        device_offset: u64,
536        opts: WriteOptions,
537        trace_flow_id: u64,
538    ) -> Result<(), zx::Status> {
539        let mut flags = BlockIoFlag::empty();
540
541        if opts.contains(WriteOptions::FORCE_ACCESS) {
542            flags |= BlockIoFlag::FORCE_ACCESS;
543        }
544
545        if opts.contains(WriteOptions::PRE_BARRIER) {
546            flags |= BlockIoFlag::PRE_BARRIER;
547        }
548
549        match buffer_slice {
550            BufferSlice::VmoId { vmo_id, offset, length } => {
551                self.send(BlockFifoRequest {
552                    command: BlockFifoCommand {
553                        opcode: BlockOpcode::Write.into_primitive(),
554                        flags: flags.bits(),
555                        ..Default::default()
556                    },
557                    vmoid: vmo_id.id(),
558                    length: self
559                        .to_blocks(length)?
560                        .try_into()
561                        .map_err(|_| zx::Status::INVALID_ARGS)?,
562                    vmo_offset: self.to_blocks(offset)?,
563                    dev_offset: self.to_blocks(device_offset)?,
564                    trace_flow_id,
565                    ..Default::default()
566                })
567                .await?;
568            }
569            BufferSlice::Memory(mut slice) => {
570                let temp_vmo = self.temp_vmo.lock().await;
571                let mut device_block = self.to_blocks(device_offset)?;
572                loop {
573                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
574                    let block_count = self.to_blocks(to_do as u64)? as u32;
575                    temp_vmo.write(&slice[..to_do], 0)?;
576                    self.send(BlockFifoRequest {
577                        command: BlockFifoCommand {
578                            opcode: BlockOpcode::Write.into_primitive(),
579                            flags: flags.bits(),
580                            ..Default::default()
581                        },
582                        vmoid: self.temp_vmo_id.id(),
583                        length: block_count,
584                        vmo_offset: 0,
585                        dev_offset: device_block,
586                        trace_flow_id,
587                        ..Default::default()
588                    })
589                    .await?;
590                    if to_do == slice.len() {
591                        break;
592                    }
593                    device_block += block_count as u64;
594                    slice = &slice[to_do..];
595                }
596            }
597        }
598        Ok(())
599    }
600
601    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
602        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
603        let dev_offset = self.to_blocks(device_range.start)?;
604        self.send(BlockFifoRequest {
605            command: BlockFifoCommand {
606                opcode: BlockOpcode::Trim.into_primitive(),
607                flags: 0,
608                ..Default::default()
609            },
610            vmoid: block_driver::BLOCK_VMOID_INVALID,
611            length,
612            dev_offset,
613            trace_flow_id,
614            ..Default::default()
615        })
616        .await
617    }
618
619    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
620        self.send(BlockFifoRequest {
621            command: BlockFifoCommand {
622                opcode: BlockOpcode::Flush.into_primitive(),
623                flags: 0,
624                ..Default::default()
625            },
626            vmoid: block_driver::BLOCK_VMOID_INVALID,
627            trace_flow_id,
628            ..Default::default()
629        })
630        .await
631    }
632
633    fn barrier(&self) {
634        self.fifo_state.lock().attach_barrier = true;
635    }
636
637    fn block_size(&self) -> u32 {
638        self.block_size
639    }
640
641    fn block_count(&self) -> u64 {
642        self.block_count
643    }
644
645    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
646        self.max_transfer_blocks.clone()
647    }
648
649    fn block_flags(&self) -> BlockFlags {
650        self.block_flags
651    }
652
653    fn is_connected(&self) -> bool {
654        self.fifo_state.lock().fifo.is_some()
655    }
656}
657
658impl Drop for Common {
659    fn drop(&mut self) {
660        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
661        // down.
662        let _ = self.temp_vmo_id.take().into_id();
663        self.fifo_state.lock().terminate();
664    }
665}
666
667/// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
668pub struct RemoteBlockClient {
669    session: block::SessionProxy,
670    common: Common,
671}
672
673pub trait AsBlockProxy {
674    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
675
676    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
677}
678
679impl<T: AsBlockProxy> AsBlockProxy for &T {
680    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
681        AsBlockProxy::get_info(*self)
682    }
683    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
684        AsBlockProxy::open_session(*self, session)
685    }
686}
687
688macro_rules! impl_as_block_proxy {
689    ($name:ident) => {
690        impl AsBlockProxy for $name {
691            async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
692                $name::get_info(self).await
693            }
694
695            fn open_session(
696                &self,
697                session: ServerEnd<block::SessionMarker>,
698            ) -> Result<(), fidl::Error> {
699                $name::open_session(self, session)
700            }
701        }
702    };
703}
704
705impl_as_block_proxy!(BlockProxy);
706impl_as_block_proxy!(PartitionProxy);
707impl_as_block_proxy!(VolumeProxy);
708
709impl RemoteBlockClient {
710    /// Returns a connection to a remote block device via the given channel.
711    pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
712        let info =
713            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
714        let (session, server) = fidl::endpoints::create_proxy();
715        let () = remote.open_session(server).map_err(fidl_to_status)?;
716        Self::from_session(info, session).await
717    }
718
719    pub async fn from_session(
720        info: block::BlockInfo,
721        session: block::SessionProxy,
722    ) -> Result<Self, zx::Status> {
723        let fifo =
724            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
725        let fifo = fasync::Fifo::from_fifo(fifo);
726        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
727        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
728        let vmo_id =
729            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
730        let vmo_id = VmoId::new(vmo_id.id);
731        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
732    }
733}
734
735#[async_trait]
736impl BlockClient for RemoteBlockClient {
737    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
738        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
739        let vmo_id = self
740            .session
741            .attach_vmo(dup)
742            .await
743            .map_err(fidl_to_status)?
744            .map_err(zx::Status::from_raw)?;
745        Ok(VmoId::new(vmo_id.id))
746    }
747
748    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
749        self.common.detach_vmo(vmo_id).await
750    }
751
752    async fn read_at_traced(
753        &self,
754        buffer_slice: MutableBufferSlice<'_>,
755        device_offset: u64,
756        trace_flow_id: u64,
757    ) -> Result<(), zx::Status> {
758        self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
759    }
760
761    async fn write_at_with_opts_traced(
762        &self,
763        buffer_slice: BufferSlice<'_>,
764        device_offset: u64,
765        opts: WriteOptions,
766        trace_flow_id: u64,
767    ) -> Result<(), zx::Status> {
768        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
769    }
770
771    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
772        self.common.trim(range, trace_flow_id).await
773    }
774
775    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
776        self.common.flush(trace_flow_id).await
777    }
778
779    fn barrier(&self) {
780        self.common.barrier()
781    }
782
783    async fn close(&self) -> Result<(), zx::Status> {
784        let () =
785            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
786        Ok(())
787    }
788
789    fn block_size(&self) -> u32 {
790        self.common.block_size()
791    }
792
793    fn block_count(&self) -> u64 {
794        self.common.block_count()
795    }
796
797    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
798        self.common.max_transfer_blocks()
799    }
800
801    fn block_flags(&self) -> BlockFlags {
802        self.common.block_flags()
803    }
804
805    fn is_connected(&self) -> bool {
806        self.common.is_connected()
807    }
808}
809
810pub struct RemoteBlockClientSync {
811    session: block::SessionSynchronousProxy,
812    common: Common,
813}
814
815impl RemoteBlockClientSync {
816    /// Returns a connection to a remote block device via the given channel, but spawns a separate
817    /// thread for polling the fifo which makes it work in cases where no executor is configured for
818    /// the calling thread.
819    pub fn new(
820        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
821    ) -> Result<Self, zx::Status> {
822        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
823        let info = remote
824            .get_info(zx::MonotonicInstant::INFINITE)
825            .map_err(fidl_to_status)?
826            .map_err(zx::Status::from_raw)?;
827        let (client, server) = fidl::endpoints::create_endpoints();
828        let () = remote.open_session(server).map_err(fidl_to_status)?;
829        let session = block::SessionSynchronousProxy::new(client.into_channel());
830        let fifo = session
831            .get_fifo(zx::MonotonicInstant::INFINITE)
832            .map_err(fidl_to_status)?
833            .map_err(zx::Status::from_raw)?;
834        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
835        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
836        let vmo_id = session
837            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
838            .map_err(fidl_to_status)?
839            .map_err(zx::Status::from_raw)?;
840        let vmo_id = VmoId::new(vmo_id.id);
841
842        // The fifo needs to be instantiated from the thread that has the executor as that's where
843        // the fifo registers for notifications to be delivered.
844        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
845        std::thread::spawn(move || {
846            let mut executor = fasync::LocalExecutor::new();
847            let fifo = fasync::Fifo::from_fifo(fifo);
848            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
849            let fifo_state = common.fifo_state.clone();
850            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
851            executor.run_singlethreaded(FifoPoller { fifo_state });
852        });
853        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
854    }
855
856    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
857        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
858        let vmo_id = self
859            .session
860            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
861            .map_err(fidl_to_status)?
862            .map_err(zx::Status::from_raw)?;
863        Ok(VmoId::new(vmo_id.id))
864    }
865
866    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
867        block_on(self.common.detach_vmo(vmo_id))
868    }
869
870    pub fn read_at(
871        &self,
872        buffer_slice: MutableBufferSlice<'_>,
873        device_offset: u64,
874    ) -> Result<(), zx::Status> {
875        block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
876    }
877
878    pub fn write_at(
879        &self,
880        buffer_slice: BufferSlice<'_>,
881        device_offset: u64,
882    ) -> Result<(), zx::Status> {
883        block_on(self.common.write_at(
884            buffer_slice,
885            device_offset,
886            WriteOptions::empty(),
887            NO_TRACE_ID,
888        ))
889    }
890
891    pub fn flush(&self) -> Result<(), zx::Status> {
892        block_on(self.common.flush(NO_TRACE_ID))
893    }
894
895    pub fn close(&self) -> Result<(), zx::Status> {
896        let () = self
897            .session
898            .close(zx::MonotonicInstant::INFINITE)
899            .map_err(fidl_to_status)?
900            .map_err(zx::Status::from_raw)?;
901        Ok(())
902    }
903
904    pub fn block_size(&self) -> u32 {
905        self.common.block_size()
906    }
907
908    pub fn block_count(&self) -> u64 {
909        self.common.block_count()
910    }
911
912    pub fn is_connected(&self) -> bool {
913        self.common.is_connected()
914    }
915}
916
917impl Drop for RemoteBlockClientSync {
918    fn drop(&mut self) {
919        // Ignore errors here as there is not much we can do about it.
920        let _ = self.close();
921    }
922}
923
924// FifoPoller is a future responsible for sending and receiving from the fifo.
925struct FifoPoller {
926    fifo_state: FifoStateRef,
927}
928
929impl Future for FifoPoller {
930    type Output = ();
931
932    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
933        let mut state_lock = self.fifo_state.lock();
934        let state = state_lock.deref_mut(); // So that we can split the borrow.
935
936        // Send requests.
937        if state.poll_send_requests(context) {
938            return Poll::Ready(());
939        }
940
941        // Receive responses.
942        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
943        loop {
944            let mut response = MaybeUninit::uninit();
945            match fifo.try_read(context, &mut response) {
946                Poll::Pending => {
947                    state.poller_waker = Some(context.waker().clone());
948                    return Poll::Pending;
949                }
950                Poll::Ready(Ok(_)) => {
951                    let response = unsafe { response.assume_init() };
952                    let request_id = response.reqid;
953                    // If the request isn't in the map, assume that it's a cancelled read.
954                    if let Some(request_state) = state.map.get_mut(&request_id) {
955                        request_state.result.replace(zx::Status::from_raw(response.status));
956                        if let Some(waker) = request_state.waker.take() {
957                            waker.wake();
958                        }
959                    }
960                }
961                Poll::Ready(Err(_)) => {
962                    state.terminate();
963                    return Poll::Ready(());
964                }
965            }
966        }
967    }
968}
969
970#[cfg(test)]
971mod tests {
972    use super::{
973        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
974        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
975    };
976    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
977    use fidl::endpoints::RequestStream as _;
978    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
979    use futures::join;
980    use futures::stream::futures_unordered::FuturesUnordered;
981    use futures::stream::StreamExt as _;
982    use ramdevice_client::RamdiskClient;
983    use std::borrow::Cow;
984    use std::num::NonZero;
985    use std::sync::atomic::{AtomicBool, Ordering};
986    use std::sync::Arc;
987    use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
988
989    const RAMDISK_BLOCK_SIZE: u64 = 1024;
990    const RAMDISK_BLOCK_COUNT: u64 = 1024;
991
992    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
993        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
994            .await
995            .expect("RamdiskClient::create failed");
996        let client_end = ramdisk.open().expect("ramdisk.open failed");
997        let proxy = client_end.into_proxy();
998        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
999        assert_eq!(block_client.block_size(), 1024);
1000        let client_end = ramdisk.open().expect("ramdisk.open failed");
1001        let proxy = client_end.into_proxy();
1002        (ramdisk, proxy, block_client)
1003    }
1004
1005    #[fuchsia::test]
1006    async fn test_against_ram_disk() {
1007        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1008
1009        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1010        vmo.write(b"hello", 5).expect("vmo.write failed");
1011        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1012        block_client
1013            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1014            .await
1015            .expect("write_at failed");
1016        block_client
1017            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1018            .await
1019            .expect("read_at failed");
1020        let mut buf: [u8; 5] = Default::default();
1021        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1022        assert_eq!(&buf, b"hello");
1023        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1024    }
1025
1026    #[fuchsia::test]
1027    async fn test_alignment() {
1028        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1029        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1030        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1031        block_client
1032            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1033            .await
1034            .expect_err("expected failure due to bad alignment");
1035        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1036    }
1037
1038    #[fuchsia::test]
1039    async fn test_parallel_io() {
1040        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1041        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1042        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1043        let mut reads = Vec::new();
1044        for _ in 0..1024 {
1045            reads.push(
1046                block_client
1047                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1048                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1049            );
1050        }
1051        futures::future::join_all(reads).await;
1052        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1053    }
1054
1055    #[fuchsia::test]
1056    async fn test_closed_device() {
1057        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1058        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1059        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1060        let mut reads = Vec::new();
1061        for _ in 0..1024 {
1062            reads.push(
1063                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1064            );
1065        }
1066        assert!(block_client.is_connected());
1067        let _ = futures::join!(futures::future::join_all(reads), async {
1068            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1069        });
1070        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1071        while block_client
1072            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1073            .await
1074            .is_ok()
1075        {}
1076
1077        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1078        // get false-positives from is_connected.
1079        while block_client.is_connected() {
1080            // Sleep for a bit to minimise lock contention.
1081            fasync::Timer::new(fasync::MonotonicInstant::after(
1082                zx::MonotonicDuration::from_millis(500),
1083            ))
1084            .await;
1085        }
1086
1087        // But once is_connected goes negative, it should stay negative.
1088        assert_eq!(block_client.is_connected(), false);
1089        let _ = block_client.detach_vmo(vmo_id).await;
1090    }
1091
1092    #[fuchsia::test]
1093    async fn test_cancelled_reads() {
1094        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1095        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1096        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1097        {
1098            let mut reads = FuturesUnordered::new();
1099            for _ in 0..1024 {
1100                reads.push(
1101                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1102                );
1103            }
1104            // Read the first 500 results and then dump the rest.
1105            for _ in 0..500 {
1106                reads.next().await;
1107            }
1108        }
1109        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1110    }
1111
1112    #[fuchsia::test]
1113    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1114        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1115        let block_client_ref = &block_client;
1116        let test_one = |offset, len, fill| async move {
1117            let buf = vec![fill; len];
1118            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1119            // Read back an extra block either side.
1120            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1121            block_client_ref
1122                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1123                .await
1124                .expect("read_at failed");
1125            assert_eq!(
1126                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1127                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1128            );
1129            assert_eq!(
1130                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1131                &buf[..]
1132            );
1133            assert_eq!(
1134                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1135                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1136            );
1137        };
1138        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1139        join!(
1140            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1141            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1142        );
1143    }
1144
1145    // Implements dummy server which can be used by test cases to verify whether
1146    // channel messages and fifo operations are being received - by using set_channel_handler or
1147    // set_fifo_hander respectively
1148    struct FakeBlockServer<'a> {
1149        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1150        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1151        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1152    }
1153
1154    impl<'a> FakeBlockServer<'a> {
1155        // Creates a new FakeBlockServer given a channel to listen on.
1156        //
1157        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1158        // handles requests received from channel or the fifo respectfully.
1159        //
1160        // 'channel_handler' receives a message before it is handled by the default implementation
1161        // and can return 'true' to indicate all processing is done and no further processing of
1162        // that message is required
1163        //
1164        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1165        // FakeBlockServer will send over the fifo.
1166        fn new(
1167            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1168            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1169            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1170        ) -> FakeBlockServer<'a> {
1171            FakeBlockServer {
1172                server_channel: Some(server_channel),
1173                channel_handler: Box::new(channel_handler),
1174                fifo_handler: Box::new(fifo_handler),
1175            }
1176        }
1177
1178        // Runs the server
1179        async fn run(&mut self) {
1180            let server = self.server_channel.take().unwrap();
1181
1182            // Set up a mock server.
1183            let (server_fifo, client_fifo) =
1184                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1185                    .expect("Fifo::create failed");
1186            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1187
1188            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1189            let fifo_future = Abortable::new(
1190                async {
1191                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1192                    let (mut reader, mut writer) = fifo.async_io();
1193                    let mut request = BlockFifoRequest::default();
1194                    loop {
1195                        match reader.read_entries(&mut request).await {
1196                            Ok(1) => {}
1197                            Err(zx::Status::PEER_CLOSED) => break,
1198                            Err(e) => panic!("read_entry failed {:?}", e),
1199                            _ => unreachable!(),
1200                        };
1201
1202                        let response = self.fifo_handler.as_ref()(request);
1203                        writer
1204                            .write_entries(std::slice::from_ref(&response))
1205                            .await
1206                            .expect("write_entries failed");
1207                    }
1208                },
1209                fifo_future_abort_registration,
1210            );
1211
1212            let channel_future = async {
1213                server
1214                    .into_stream()
1215                    .for_each_concurrent(None, |request| async {
1216                        let request = request.expect("unexpected fidl error");
1217
1218                        match request {
1219                            block::BlockRequest::GetInfo { responder } => {
1220                                responder
1221                                    .send(Ok(&block::BlockInfo {
1222                                        block_count: 1024,
1223                                        block_size: 512,
1224                                        max_transfer_size: 1024 * 1024,
1225                                        flags: block::Flag::empty(),
1226                                    }))
1227                                    .expect("send failed");
1228                            }
1229                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1230                                let stream = session.into_stream();
1231                                stream
1232                                    .for_each(|request| async {
1233                                        let request = request.expect("unexpected fidl error");
1234                                        // Give a chance for the test to register and potentially
1235                                        // handle the event
1236                                        if self.channel_handler.as_ref()(&request) {
1237                                            return;
1238                                        }
1239                                        match request {
1240                                            block::SessionRequest::GetFifo { responder } => {
1241                                                match maybe_server_fifo.lock().take() {
1242                                                    Some(fifo) => {
1243                                                        responder.send(Ok(fifo.downcast()))
1244                                                    }
1245                                                    None => responder.send(Err(
1246                                                        zx::Status::NO_RESOURCES.into_raw(),
1247                                                    )),
1248                                                }
1249                                                .expect("send failed")
1250                                            }
1251                                            block::SessionRequest::AttachVmo {
1252                                                vmo: _,
1253                                                responder,
1254                                            } => responder
1255                                                .send(Ok(&block::VmoId { id: 1 }))
1256                                                .expect("send failed"),
1257                                            block::SessionRequest::Close { responder } => {
1258                                                fifo_future_abort.abort();
1259                                                responder.send(Ok(())).expect("send failed")
1260                                            }
1261                                        }
1262                                    })
1263                                    .await
1264                            }
1265                            _ => panic!("Unexpected message"),
1266                        }
1267                    })
1268                    .await;
1269            };
1270
1271            let _result = join!(fifo_future, channel_future);
1272            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1273        }
1274    }
1275
1276    #[fuchsia::test]
1277    async fn test_block_close_is_called() {
1278        let close_called = fuchsia_sync::Mutex::new(false);
1279        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1280
1281        std::thread::spawn(move || {
1282            let _block_client =
1283                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1284            // The drop here should cause Close to be sent.
1285        });
1286
1287        let channel_handler = |request: &block::SessionRequest| -> bool {
1288            if let block::SessionRequest::Close { .. } = request {
1289                *close_called.lock() = true;
1290            }
1291            false
1292        };
1293        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1294
1295        // After the server has finished running, we can check to see that close was called.
1296        assert!(*close_called.lock());
1297    }
1298
1299    #[fuchsia::test]
1300    async fn test_block_flush_is_called() {
1301        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1302
1303        struct Interface {
1304            flush_called: Arc<AtomicBool>,
1305        }
1306        impl block_server::async_interface::Interface for Interface {
1307            async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1308                Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1309                    device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1310                    max_transfer_blocks: None,
1311                    block_range: Some(0..1000),
1312                    type_guid: [0; 16],
1313                    instance_guid: [0; 16],
1314                    name: "foo".to_string(),
1315                    flags: 0,
1316                })))
1317            }
1318
1319            async fn read(
1320                &self,
1321                _device_block_offset: u64,
1322                _block_count: u32,
1323                _vmo: &Arc<zx::Vmo>,
1324                _vmo_offset: u64,
1325                _trace_flow_id: Option<NonZero<u64>>,
1326            ) -> Result<(), zx::Status> {
1327                unreachable!();
1328            }
1329
1330            fn barrier(&self) -> Result<(), zx::Status> {
1331                unreachable!()
1332            }
1333
1334            async fn write(
1335                &self,
1336                _device_block_offset: u64,
1337                _block_count: u32,
1338                _vmo: &Arc<zx::Vmo>,
1339                _vmo_offset: u64,
1340                _opts: WriteOptions,
1341                _trace_flow_id: Option<NonZero<u64>>,
1342            ) -> Result<(), zx::Status> {
1343                unreachable!();
1344            }
1345
1346            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1347                self.flush_called.store(true, Ordering::Relaxed);
1348                Ok(())
1349            }
1350
1351            async fn trim(
1352                &self,
1353                _device_block_offset: u64,
1354                _block_count: u32,
1355                _trace_flow_id: Option<NonZero<u64>>,
1356            ) -> Result<(), zx::Status> {
1357                unreachable!();
1358            }
1359        }
1360
1361        let flush_called = Arc::new(AtomicBool::new(false));
1362
1363        futures::join!(
1364            async {
1365                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1366
1367                block_client.flush().await.expect("flush failed");
1368            },
1369            async {
1370                let block_server = BlockServer::new(
1371                    512,
1372                    Arc::new(Interface { flush_called: flush_called.clone() }),
1373                );
1374                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1375            }
1376        );
1377
1378        assert!(flush_called.load(Ordering::Relaxed));
1379    }
1380
1381    #[fuchsia::test]
1382    async fn test_trace_flow_ids_set() {
1383        let (proxy, server) = fidl::endpoints::create_proxy();
1384
1385        futures::join!(
1386            async {
1387                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1388                block_client.flush().await.expect("flush failed");
1389            },
1390            async {
1391                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1392                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1393                    if request.trace_flow_id > 0 {
1394                        *flow_id.lock() = Some(request.trace_flow_id);
1395                    }
1396                    BlockFifoResponse {
1397                        status: zx::Status::OK.into_raw(),
1398                        reqid: request.reqid,
1399                        ..Default::default()
1400                    }
1401                };
1402                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1403                // After the server has finished running, verify the trace flow ID was set to some value.
1404                assert!(flow_id.lock().is_some());
1405            }
1406        );
1407    }
1408}
OSZAR »