1use async_trait::async_trait;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
8use fidl_fuchsia_hardware_block_partition::PartitionProxy;
9use fidl_fuchsia_hardware_block_volume::VolumeProxy;
10use fuchsia_sync::Mutex;
11use futures::channel::oneshot;
12use futures::executor::block_on;
13use lazy_static::lazy_static;
14use std::collections::HashMap;
15use std::future::Future;
16use std::hash::{Hash, Hasher};
17use std::mem::MaybeUninit;
18use std::num::NonZero;
19use std::ops::{DerefMut, Range};
20use std::pin::Pin;
21use std::sync::atomic::{AtomicU16, Ordering};
22use std::sync::Arc;
23use std::task::{Context, Poll, Waker};
24use zx::sys::zx_handle_t;
25use zx::{self as zx, HandleBased as _};
26use {
27 fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
28 fuchsia_async as fasync, storage_trace as trace,
29};
30
31pub use cache::Cache;
32
33pub use block::Flag as BlockFlags;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41pub const NO_TRACE_ID: u64 = 0;
43
44pub use block_driver::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47 match error {
48 fidl::Error::ClientChannelClosed { status, .. } => status,
49 _ => zx::Status::INTERNAL,
50 }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54 match BlockOpcode::from_primitive(opcode) {
55 Some(BlockOpcode::Read) => "read",
56 Some(BlockOpcode::Write) => "write",
57 Some(BlockOpcode::Flush) => "flush",
58 Some(BlockOpcode::Trim) => "trim",
59 Some(BlockOpcode::CloseVmo) => "close_vmo",
60 None => "unknown",
61 }
62}
63
64fn generate_trace_flow_id(request_id: u32) -> u64 {
67 lazy_static! {
68 static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
69 };
70 *SELF_HANDLE as u64 + (request_id as u64) << 32
71}
72
73pub enum BufferSlice<'a> {
74 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
75 Memory(&'a [u8]),
76}
77
78impl<'a> BufferSlice<'a> {
79 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
80 BufferSlice::VmoId { vmo_id, offset, length }
81 }
82}
83
84impl<'a> From<&'a [u8]> for BufferSlice<'a> {
85 fn from(buf: &'a [u8]) -> Self {
86 BufferSlice::Memory(buf)
87 }
88}
89
90pub enum MutableBufferSlice<'a> {
91 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
92 Memory(&'a mut [u8]),
93}
94
95impl<'a> MutableBufferSlice<'a> {
96 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
97 MutableBufferSlice::VmoId { vmo_id, offset, length }
98 }
99}
100
101impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
102 fn from(buf: &'a mut [u8]) -> Self {
103 MutableBufferSlice::Memory(buf)
104 }
105}
106
107#[derive(Default)]
108struct RequestState {
109 result: Option<zx::Status>,
110 waker: Option<Waker>,
111}
112
113#[derive(Default)]
114struct FifoState {
115 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
117
118 next_request_id: u32,
120
121 queue: std::collections::VecDeque<BlockFifoRequest>,
123
124 map: HashMap<u32, RequestState>,
126
127 poller_waker: Option<Waker>,
129
130 attach_barrier: bool,
132}
133
134impl FifoState {
135 fn terminate(&mut self) {
136 self.fifo.take();
137 for (_, request_state) in self.map.iter_mut() {
138 request_state.result.get_or_insert(zx::Status::CANCELED);
139 if let Some(waker) = request_state.waker.take() {
140 waker.wake();
141 }
142 }
143 if let Some(waker) = self.poller_waker.take() {
144 waker.wake();
145 }
146 }
147
148 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
150 let fifo = if let Some(fifo) = self.fifo.as_ref() {
151 fifo
152 } else {
153 return true;
154 };
155
156 loop {
157 let slice = self.queue.as_slices().0;
158 if slice.is_empty() {
159 return false;
160 }
161 match fifo.try_write(context, slice) {
162 Poll::Ready(Ok(sent)) => {
163 self.queue.drain(0..sent);
164 }
165 Poll::Ready(Err(_)) => {
166 self.terminate();
167 return true;
168 }
169 Poll::Pending => {
170 return false;
171 }
172 }
173 }
174 }
175}
176
177type FifoStateRef = Arc<Mutex<FifoState>>;
178
179struct ResponseFuture {
181 request_id: u32,
182 fifo_state: FifoStateRef,
183}
184
185impl ResponseFuture {
186 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
187 ResponseFuture { request_id, fifo_state }
188 }
189}
190
191impl Future for ResponseFuture {
192 type Output = Result<(), zx::Status>;
193
194 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
195 let mut state = self.fifo_state.lock();
196 let request_state = state.map.get_mut(&self.request_id).unwrap();
197 if let Some(result) = request_state.result {
198 Poll::Ready(result.into())
199 } else {
200 request_state.waker.replace(context.waker().clone());
201 Poll::Pending
202 }
203 }
204}
205
206impl Drop for ResponseFuture {
207 fn drop(&mut self) {
208 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
209 }
210}
211
212#[derive(Debug)]
214#[must_use]
215pub struct VmoId(AtomicU16);
216
217impl VmoId {
218 pub fn new(id: u16) -> Self {
220 Self(AtomicU16::new(id))
221 }
222
223 pub fn take(&self) -> Self {
225 Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
226 }
227
228 pub fn is_valid(&self) -> bool {
229 self.id() != block_driver::BLOCK_VMOID_INVALID
230 }
231
232 #[must_use]
234 pub fn into_id(self) -> u16 {
235 self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
236 }
237
238 pub fn id(&self) -> u16 {
239 self.0.load(Ordering::Relaxed)
240 }
241}
242
243impl PartialEq for VmoId {
244 fn eq(&self, other: &Self) -> bool {
245 self.id() == other.id()
246 }
247}
248
249impl Eq for VmoId {}
250
251impl Drop for VmoId {
252 fn drop(&mut self) {
253 assert_eq!(
254 self.0.load(Ordering::Relaxed),
255 block_driver::BLOCK_VMOID_INVALID,
256 "Did you forget to detach?"
257 );
258 }
259}
260
261impl Hash for VmoId {
262 fn hash<H: Hasher>(&self, state: &mut H) {
263 self.id().hash(state);
264 }
265}
266
267#[async_trait]
271pub trait BlockClient: Send + Sync {
272 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
274
275 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
277
278 async fn read_at(
280 &self,
281 buffer_slice: MutableBufferSlice<'_>,
282 device_offset: u64,
283 ) -> Result<(), zx::Status> {
284 self.read_at_traced(buffer_slice, device_offset, 0).await
285 }
286
287 async fn read_at_traced(
288 &self,
289 buffer_slice: MutableBufferSlice<'_>,
290 device_offset: u64,
291 trace_flow_id: u64,
292 ) -> Result<(), zx::Status>;
293
294 async fn write_at(
296 &self,
297 buffer_slice: BufferSlice<'_>,
298 device_offset: u64,
299 ) -> Result<(), zx::Status> {
300 self.write_at_with_opts_traced(
301 buffer_slice,
302 device_offset,
303 WriteOptions::empty(),
304 NO_TRACE_ID,
305 )
306 .await
307 }
308
309 async fn write_at_with_opts(
310 &self,
311 buffer_slice: BufferSlice<'_>,
312 device_offset: u64,
313 opts: WriteOptions,
314 ) -> Result<(), zx::Status> {
315 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
316 }
317
318 async fn write_at_with_opts_traced(
319 &self,
320 buffer_slice: BufferSlice<'_>,
321 device_offset: u64,
322 opts: WriteOptions,
323 trace_flow_id: u64,
324 ) -> Result<(), zx::Status>;
325
326 async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
328 self.trim_traced(device_range, NO_TRACE_ID).await
329 }
330
331 async fn trim_traced(
332 &self,
333 device_range: Range<u64>,
334 trace_flow_id: u64,
335 ) -> Result<(), zx::Status>;
336
337 fn barrier(&self);
342
343 async fn flush(&self) -> Result<(), zx::Status> {
344 self.flush_traced(NO_TRACE_ID).await
345 }
346
347 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
349
350 async fn close(&self) -> Result<(), zx::Status>;
352
353 fn block_size(&self) -> u32;
355
356 fn block_count(&self) -> u64;
358
359 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
361
362 fn block_flags(&self) -> BlockFlags;
364
365 fn is_connected(&self) -> bool;
367}
368
369struct Common {
370 block_size: u32,
371 block_count: u64,
372 max_transfer_blocks: Option<NonZero<u32>>,
373 block_flags: BlockFlags,
374 fifo_state: FifoStateRef,
375 temp_vmo: futures::lock::Mutex<zx::Vmo>,
376 temp_vmo_id: VmoId,
377}
378
379impl Common {
380 fn new(
381 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
382 info: &block::BlockInfo,
383 temp_vmo: zx::Vmo,
384 temp_vmo_id: VmoId,
385 ) -> Self {
386 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
387 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
388 Self {
389 block_size: info.block_size,
390 block_count: info.block_count,
391 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
392 NonZero::new(info.max_transfer_size / info.block_size)
393 } else {
394 None
395 },
396 block_flags: info.flags,
397 fifo_state,
398 temp_vmo: futures::lock::Mutex::new(temp_vmo),
399 temp_vmo_id,
400 }
401 }
402
403 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
404 if bytes % self.block_size as u64 != 0 {
405 Err(zx::Status::INVALID_ARGS)
406 } else {
407 Ok(bytes / self.block_size as u64)
408 }
409 }
410
411 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
413 async move {
414 let (request_id, trace_flow_id) = {
415 let mut state = self.fifo_state.lock();
416
417 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
418 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
419 && state.attach_barrier
420 {
421 flags |= BlockIoFlag::PRE_BARRIER;
422 request.command.flags = flags.bits();
423 state.attach_barrier = false;
424 }
425
426 if state.fifo.is_none() {
427 return Err(zx::Status::CANCELED);
429 }
430 trace::duration!(
431 c"storage",
432 c"block_client::send::start",
433 "op" => opcode_str(request.command.opcode)
434 );
435 let request_id = state.next_request_id;
436 state.next_request_id = state.next_request_id.overflowing_add(1).0;
437 assert!(
438 state.map.insert(request_id, RequestState::default()).is_none(),
439 "request id in use!"
440 );
441 request.reqid = request_id;
442 if request.trace_flow_id == NO_TRACE_ID {
443 request.trace_flow_id = generate_trace_flow_id(request_id);
444 }
445 let trace_flow_id = request.trace_flow_id;
446 trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
447 state.queue.push_back(request);
448 if let Some(waker) = state.poller_waker.clone() {
449 state.poll_send_requests(&mut Context::from_waker(&waker));
450 }
451 (request_id, trace_flow_id)
452 };
453 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
454 trace::duration!(c"storage", c"block_client::send::end");
455 trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
456 Ok(())
457 }
458 .await
459 }
460
461 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
462 self.send(BlockFifoRequest {
463 command: BlockFifoCommand {
464 opcode: BlockOpcode::CloseVmo.into_primitive(),
465 flags: 0,
466 ..Default::default()
467 },
468 vmoid: vmo_id.into_id(),
469 ..Default::default()
470 })
471 .await
472 }
473
474 async fn read_at(
475 &self,
476 buffer_slice: MutableBufferSlice<'_>,
477 device_offset: u64,
478 trace_flow_id: u64,
479 ) -> Result<(), zx::Status> {
480 match buffer_slice {
481 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
482 self.send(BlockFifoRequest {
483 command: BlockFifoCommand {
484 opcode: BlockOpcode::Read.into_primitive(),
485 flags: 0,
486 ..Default::default()
487 },
488 vmoid: vmo_id.id(),
489 length: self
490 .to_blocks(length)?
491 .try_into()
492 .map_err(|_| zx::Status::INVALID_ARGS)?,
493 vmo_offset: self.to_blocks(offset)?,
494 dev_offset: self.to_blocks(device_offset)?,
495 trace_flow_id,
496 ..Default::default()
497 })
498 .await?
499 }
500 MutableBufferSlice::Memory(mut slice) => {
501 let temp_vmo = self.temp_vmo.lock().await;
502 let mut device_block = self.to_blocks(device_offset)?;
503 loop {
504 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
505 let block_count = self.to_blocks(to_do as u64)? as u32;
506 self.send(BlockFifoRequest {
507 command: BlockFifoCommand {
508 opcode: BlockOpcode::Read.into_primitive(),
509 flags: 0,
510 ..Default::default()
511 },
512 vmoid: self.temp_vmo_id.id(),
513 length: block_count,
514 vmo_offset: 0,
515 dev_offset: device_block,
516 trace_flow_id,
517 ..Default::default()
518 })
519 .await?;
520 temp_vmo.read(&mut slice[..to_do], 0)?;
521 if to_do == slice.len() {
522 break;
523 }
524 device_block += block_count as u64;
525 slice = &mut slice[to_do..];
526 }
527 }
528 }
529 Ok(())
530 }
531
532 async fn write_at(
533 &self,
534 buffer_slice: BufferSlice<'_>,
535 device_offset: u64,
536 opts: WriteOptions,
537 trace_flow_id: u64,
538 ) -> Result<(), zx::Status> {
539 let mut flags = BlockIoFlag::empty();
540
541 if opts.contains(WriteOptions::FORCE_ACCESS) {
542 flags |= BlockIoFlag::FORCE_ACCESS;
543 }
544
545 if opts.contains(WriteOptions::PRE_BARRIER) {
546 flags |= BlockIoFlag::PRE_BARRIER;
547 }
548
549 match buffer_slice {
550 BufferSlice::VmoId { vmo_id, offset, length } => {
551 self.send(BlockFifoRequest {
552 command: BlockFifoCommand {
553 opcode: BlockOpcode::Write.into_primitive(),
554 flags: flags.bits(),
555 ..Default::default()
556 },
557 vmoid: vmo_id.id(),
558 length: self
559 .to_blocks(length)?
560 .try_into()
561 .map_err(|_| zx::Status::INVALID_ARGS)?,
562 vmo_offset: self.to_blocks(offset)?,
563 dev_offset: self.to_blocks(device_offset)?,
564 trace_flow_id,
565 ..Default::default()
566 })
567 .await?;
568 }
569 BufferSlice::Memory(mut slice) => {
570 let temp_vmo = self.temp_vmo.lock().await;
571 let mut device_block = self.to_blocks(device_offset)?;
572 loop {
573 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
574 let block_count = self.to_blocks(to_do as u64)? as u32;
575 temp_vmo.write(&slice[..to_do], 0)?;
576 self.send(BlockFifoRequest {
577 command: BlockFifoCommand {
578 opcode: BlockOpcode::Write.into_primitive(),
579 flags: flags.bits(),
580 ..Default::default()
581 },
582 vmoid: self.temp_vmo_id.id(),
583 length: block_count,
584 vmo_offset: 0,
585 dev_offset: device_block,
586 trace_flow_id,
587 ..Default::default()
588 })
589 .await?;
590 if to_do == slice.len() {
591 break;
592 }
593 device_block += block_count as u64;
594 slice = &slice[to_do..];
595 }
596 }
597 }
598 Ok(())
599 }
600
601 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
602 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
603 let dev_offset = self.to_blocks(device_range.start)?;
604 self.send(BlockFifoRequest {
605 command: BlockFifoCommand {
606 opcode: BlockOpcode::Trim.into_primitive(),
607 flags: 0,
608 ..Default::default()
609 },
610 vmoid: block_driver::BLOCK_VMOID_INVALID,
611 length,
612 dev_offset,
613 trace_flow_id,
614 ..Default::default()
615 })
616 .await
617 }
618
619 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
620 self.send(BlockFifoRequest {
621 command: BlockFifoCommand {
622 opcode: BlockOpcode::Flush.into_primitive(),
623 flags: 0,
624 ..Default::default()
625 },
626 vmoid: block_driver::BLOCK_VMOID_INVALID,
627 trace_flow_id,
628 ..Default::default()
629 })
630 .await
631 }
632
633 fn barrier(&self) {
634 self.fifo_state.lock().attach_barrier = true;
635 }
636
637 fn block_size(&self) -> u32 {
638 self.block_size
639 }
640
641 fn block_count(&self) -> u64 {
642 self.block_count
643 }
644
645 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
646 self.max_transfer_blocks.clone()
647 }
648
649 fn block_flags(&self) -> BlockFlags {
650 self.block_flags
651 }
652
653 fn is_connected(&self) -> bool {
654 self.fifo_state.lock().fifo.is_some()
655 }
656}
657
658impl Drop for Common {
659 fn drop(&mut self) {
660 let _ = self.temp_vmo_id.take().into_id();
663 self.fifo_state.lock().terminate();
664 }
665}
666
667pub struct RemoteBlockClient {
669 session: block::SessionProxy,
670 common: Common,
671}
672
673pub trait AsBlockProxy {
674 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
675
676 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
677}
678
679impl<T: AsBlockProxy> AsBlockProxy for &T {
680 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
681 AsBlockProxy::get_info(*self)
682 }
683 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
684 AsBlockProxy::open_session(*self, session)
685 }
686}
687
688macro_rules! impl_as_block_proxy {
689 ($name:ident) => {
690 impl AsBlockProxy for $name {
691 async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
692 $name::get_info(self).await
693 }
694
695 fn open_session(
696 &self,
697 session: ServerEnd<block::SessionMarker>,
698 ) -> Result<(), fidl::Error> {
699 $name::open_session(self, session)
700 }
701 }
702 };
703}
704
705impl_as_block_proxy!(BlockProxy);
706impl_as_block_proxy!(PartitionProxy);
707impl_as_block_proxy!(VolumeProxy);
708
709impl RemoteBlockClient {
710 pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
712 let info =
713 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
714 let (session, server) = fidl::endpoints::create_proxy();
715 let () = remote.open_session(server).map_err(fidl_to_status)?;
716 Self::from_session(info, session).await
717 }
718
719 pub async fn from_session(
720 info: block::BlockInfo,
721 session: block::SessionProxy,
722 ) -> Result<Self, zx::Status> {
723 let fifo =
724 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
725 let fifo = fasync::Fifo::from_fifo(fifo);
726 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
727 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
728 let vmo_id =
729 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
730 let vmo_id = VmoId::new(vmo_id.id);
731 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
732 }
733}
734
735#[async_trait]
736impl BlockClient for RemoteBlockClient {
737 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
738 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
739 let vmo_id = self
740 .session
741 .attach_vmo(dup)
742 .await
743 .map_err(fidl_to_status)?
744 .map_err(zx::Status::from_raw)?;
745 Ok(VmoId::new(vmo_id.id))
746 }
747
748 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
749 self.common.detach_vmo(vmo_id).await
750 }
751
752 async fn read_at_traced(
753 &self,
754 buffer_slice: MutableBufferSlice<'_>,
755 device_offset: u64,
756 trace_flow_id: u64,
757 ) -> Result<(), zx::Status> {
758 self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
759 }
760
761 async fn write_at_with_opts_traced(
762 &self,
763 buffer_slice: BufferSlice<'_>,
764 device_offset: u64,
765 opts: WriteOptions,
766 trace_flow_id: u64,
767 ) -> Result<(), zx::Status> {
768 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
769 }
770
771 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
772 self.common.trim(range, trace_flow_id).await
773 }
774
775 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
776 self.common.flush(trace_flow_id).await
777 }
778
779 fn barrier(&self) {
780 self.common.barrier()
781 }
782
783 async fn close(&self) -> Result<(), zx::Status> {
784 let () =
785 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
786 Ok(())
787 }
788
789 fn block_size(&self) -> u32 {
790 self.common.block_size()
791 }
792
793 fn block_count(&self) -> u64 {
794 self.common.block_count()
795 }
796
797 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
798 self.common.max_transfer_blocks()
799 }
800
801 fn block_flags(&self) -> BlockFlags {
802 self.common.block_flags()
803 }
804
805 fn is_connected(&self) -> bool {
806 self.common.is_connected()
807 }
808}
809
810pub struct RemoteBlockClientSync {
811 session: block::SessionSynchronousProxy,
812 common: Common,
813}
814
815impl RemoteBlockClientSync {
816 pub fn new(
820 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
821 ) -> Result<Self, zx::Status> {
822 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
823 let info = remote
824 .get_info(zx::MonotonicInstant::INFINITE)
825 .map_err(fidl_to_status)?
826 .map_err(zx::Status::from_raw)?;
827 let (client, server) = fidl::endpoints::create_endpoints();
828 let () = remote.open_session(server).map_err(fidl_to_status)?;
829 let session = block::SessionSynchronousProxy::new(client.into_channel());
830 let fifo = session
831 .get_fifo(zx::MonotonicInstant::INFINITE)
832 .map_err(fidl_to_status)?
833 .map_err(zx::Status::from_raw)?;
834 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
835 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
836 let vmo_id = session
837 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
838 .map_err(fidl_to_status)?
839 .map_err(zx::Status::from_raw)?;
840 let vmo_id = VmoId::new(vmo_id.id);
841
842 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
845 std::thread::spawn(move || {
846 let mut executor = fasync::LocalExecutor::new();
847 let fifo = fasync::Fifo::from_fifo(fifo);
848 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
849 let fifo_state = common.fifo_state.clone();
850 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
851 executor.run_singlethreaded(FifoPoller { fifo_state });
852 });
853 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
854 }
855
856 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
857 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
858 let vmo_id = self
859 .session
860 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
861 .map_err(fidl_to_status)?
862 .map_err(zx::Status::from_raw)?;
863 Ok(VmoId::new(vmo_id.id))
864 }
865
866 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
867 block_on(self.common.detach_vmo(vmo_id))
868 }
869
870 pub fn read_at(
871 &self,
872 buffer_slice: MutableBufferSlice<'_>,
873 device_offset: u64,
874 ) -> Result<(), zx::Status> {
875 block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
876 }
877
878 pub fn write_at(
879 &self,
880 buffer_slice: BufferSlice<'_>,
881 device_offset: u64,
882 ) -> Result<(), zx::Status> {
883 block_on(self.common.write_at(
884 buffer_slice,
885 device_offset,
886 WriteOptions::empty(),
887 NO_TRACE_ID,
888 ))
889 }
890
891 pub fn flush(&self) -> Result<(), zx::Status> {
892 block_on(self.common.flush(NO_TRACE_ID))
893 }
894
895 pub fn close(&self) -> Result<(), zx::Status> {
896 let () = self
897 .session
898 .close(zx::MonotonicInstant::INFINITE)
899 .map_err(fidl_to_status)?
900 .map_err(zx::Status::from_raw)?;
901 Ok(())
902 }
903
904 pub fn block_size(&self) -> u32 {
905 self.common.block_size()
906 }
907
908 pub fn block_count(&self) -> u64 {
909 self.common.block_count()
910 }
911
912 pub fn is_connected(&self) -> bool {
913 self.common.is_connected()
914 }
915}
916
917impl Drop for RemoteBlockClientSync {
918 fn drop(&mut self) {
919 let _ = self.close();
921 }
922}
923
924struct FifoPoller {
926 fifo_state: FifoStateRef,
927}
928
929impl Future for FifoPoller {
930 type Output = ();
931
932 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
933 let mut state_lock = self.fifo_state.lock();
934 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
938 return Poll::Ready(());
939 }
940
941 let fifo = state.fifo.as_ref().unwrap(); loop {
944 let mut response = MaybeUninit::uninit();
945 match fifo.try_read(context, &mut response) {
946 Poll::Pending => {
947 state.poller_waker = Some(context.waker().clone());
948 return Poll::Pending;
949 }
950 Poll::Ready(Ok(_)) => {
951 let response = unsafe { response.assume_init() };
952 let request_id = response.reqid;
953 if let Some(request_state) = state.map.get_mut(&request_id) {
955 request_state.result.replace(zx::Status::from_raw(response.status));
956 if let Some(waker) = request_state.waker.take() {
957 waker.wake();
958 }
959 }
960 }
961 Poll::Ready(Err(_)) => {
962 state.terminate();
963 return Poll::Ready(());
964 }
965 }
966 }
967 }
968}
969
970#[cfg(test)]
971mod tests {
972 use super::{
973 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
974 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
975 };
976 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
977 use fidl::endpoints::RequestStream as _;
978 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
979 use futures::join;
980 use futures::stream::futures_unordered::FuturesUnordered;
981 use futures::stream::StreamExt as _;
982 use ramdevice_client::RamdiskClient;
983 use std::borrow::Cow;
984 use std::num::NonZero;
985 use std::sync::atomic::{AtomicBool, Ordering};
986 use std::sync::Arc;
987 use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
988
989 const RAMDISK_BLOCK_SIZE: u64 = 1024;
990 const RAMDISK_BLOCK_COUNT: u64 = 1024;
991
992 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
993 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
994 .await
995 .expect("RamdiskClient::create failed");
996 let client_end = ramdisk.open().expect("ramdisk.open failed");
997 let proxy = client_end.into_proxy();
998 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
999 assert_eq!(block_client.block_size(), 1024);
1000 let client_end = ramdisk.open().expect("ramdisk.open failed");
1001 let proxy = client_end.into_proxy();
1002 (ramdisk, proxy, block_client)
1003 }
1004
1005 #[fuchsia::test]
1006 async fn test_against_ram_disk() {
1007 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1008
1009 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1010 vmo.write(b"hello", 5).expect("vmo.write failed");
1011 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1012 block_client
1013 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1014 .await
1015 .expect("write_at failed");
1016 block_client
1017 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1018 .await
1019 .expect("read_at failed");
1020 let mut buf: [u8; 5] = Default::default();
1021 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1022 assert_eq!(&buf, b"hello");
1023 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1024 }
1025
1026 #[fuchsia::test]
1027 async fn test_alignment() {
1028 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1029 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1030 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1031 block_client
1032 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1033 .await
1034 .expect_err("expected failure due to bad alignment");
1035 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1036 }
1037
1038 #[fuchsia::test]
1039 async fn test_parallel_io() {
1040 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1041 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1042 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1043 let mut reads = Vec::new();
1044 for _ in 0..1024 {
1045 reads.push(
1046 block_client
1047 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1048 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1049 );
1050 }
1051 futures::future::join_all(reads).await;
1052 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1053 }
1054
1055 #[fuchsia::test]
1056 async fn test_closed_device() {
1057 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1058 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1059 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1060 let mut reads = Vec::new();
1061 for _ in 0..1024 {
1062 reads.push(
1063 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1064 );
1065 }
1066 assert!(block_client.is_connected());
1067 let _ = futures::join!(futures::future::join_all(reads), async {
1068 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1069 });
1070 while block_client
1072 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1073 .await
1074 .is_ok()
1075 {}
1076
1077 while block_client.is_connected() {
1080 fasync::Timer::new(fasync::MonotonicInstant::after(
1082 zx::MonotonicDuration::from_millis(500),
1083 ))
1084 .await;
1085 }
1086
1087 assert_eq!(block_client.is_connected(), false);
1089 let _ = block_client.detach_vmo(vmo_id).await;
1090 }
1091
1092 #[fuchsia::test]
1093 async fn test_cancelled_reads() {
1094 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1095 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1096 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1097 {
1098 let mut reads = FuturesUnordered::new();
1099 for _ in 0..1024 {
1100 reads.push(
1101 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1102 );
1103 }
1104 for _ in 0..500 {
1106 reads.next().await;
1107 }
1108 }
1109 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1110 }
1111
1112 #[fuchsia::test]
1113 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1114 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1115 let block_client_ref = &block_client;
1116 let test_one = |offset, len, fill| async move {
1117 let buf = vec![fill; len];
1118 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1119 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1121 block_client_ref
1122 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1123 .await
1124 .expect("read_at failed");
1125 assert_eq!(
1126 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1127 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1128 );
1129 assert_eq!(
1130 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1131 &buf[..]
1132 );
1133 assert_eq!(
1134 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1135 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1136 );
1137 };
1138 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1139 join!(
1140 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1141 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1142 );
1143 }
1144
1145 struct FakeBlockServer<'a> {
1149 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1150 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1151 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1152 }
1153
1154 impl<'a> FakeBlockServer<'a> {
1155 fn new(
1167 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1168 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1169 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1170 ) -> FakeBlockServer<'a> {
1171 FakeBlockServer {
1172 server_channel: Some(server_channel),
1173 channel_handler: Box::new(channel_handler),
1174 fifo_handler: Box::new(fifo_handler),
1175 }
1176 }
1177
1178 async fn run(&mut self) {
1180 let server = self.server_channel.take().unwrap();
1181
1182 let (server_fifo, client_fifo) =
1184 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1185 .expect("Fifo::create failed");
1186 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1187
1188 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1189 let fifo_future = Abortable::new(
1190 async {
1191 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1192 let (mut reader, mut writer) = fifo.async_io();
1193 let mut request = BlockFifoRequest::default();
1194 loop {
1195 match reader.read_entries(&mut request).await {
1196 Ok(1) => {}
1197 Err(zx::Status::PEER_CLOSED) => break,
1198 Err(e) => panic!("read_entry failed {:?}", e),
1199 _ => unreachable!(),
1200 };
1201
1202 let response = self.fifo_handler.as_ref()(request);
1203 writer
1204 .write_entries(std::slice::from_ref(&response))
1205 .await
1206 .expect("write_entries failed");
1207 }
1208 },
1209 fifo_future_abort_registration,
1210 );
1211
1212 let channel_future = async {
1213 server
1214 .into_stream()
1215 .for_each_concurrent(None, |request| async {
1216 let request = request.expect("unexpected fidl error");
1217
1218 match request {
1219 block::BlockRequest::GetInfo { responder } => {
1220 responder
1221 .send(Ok(&block::BlockInfo {
1222 block_count: 1024,
1223 block_size: 512,
1224 max_transfer_size: 1024 * 1024,
1225 flags: block::Flag::empty(),
1226 }))
1227 .expect("send failed");
1228 }
1229 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1230 let stream = session.into_stream();
1231 stream
1232 .for_each(|request| async {
1233 let request = request.expect("unexpected fidl error");
1234 if self.channel_handler.as_ref()(&request) {
1237 return;
1238 }
1239 match request {
1240 block::SessionRequest::GetFifo { responder } => {
1241 match maybe_server_fifo.lock().take() {
1242 Some(fifo) => {
1243 responder.send(Ok(fifo.downcast()))
1244 }
1245 None => responder.send(Err(
1246 zx::Status::NO_RESOURCES.into_raw(),
1247 )),
1248 }
1249 .expect("send failed")
1250 }
1251 block::SessionRequest::AttachVmo {
1252 vmo: _,
1253 responder,
1254 } => responder
1255 .send(Ok(&block::VmoId { id: 1 }))
1256 .expect("send failed"),
1257 block::SessionRequest::Close { responder } => {
1258 fifo_future_abort.abort();
1259 responder.send(Ok(())).expect("send failed")
1260 }
1261 }
1262 })
1263 .await
1264 }
1265 _ => panic!("Unexpected message"),
1266 }
1267 })
1268 .await;
1269 };
1270
1271 let _result = join!(fifo_future, channel_future);
1272 }
1274 }
1275
1276 #[fuchsia::test]
1277 async fn test_block_close_is_called() {
1278 let close_called = fuchsia_sync::Mutex::new(false);
1279 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1280
1281 std::thread::spawn(move || {
1282 let _block_client =
1283 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1284 });
1286
1287 let channel_handler = |request: &block::SessionRequest| -> bool {
1288 if let block::SessionRequest::Close { .. } = request {
1289 *close_called.lock() = true;
1290 }
1291 false
1292 };
1293 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1294
1295 assert!(*close_called.lock());
1297 }
1298
1299 #[fuchsia::test]
1300 async fn test_block_flush_is_called() {
1301 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1302
1303 struct Interface {
1304 flush_called: Arc<AtomicBool>,
1305 }
1306 impl block_server::async_interface::Interface for Interface {
1307 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1308 Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1309 device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1310 max_transfer_blocks: None,
1311 block_range: Some(0..1000),
1312 type_guid: [0; 16],
1313 instance_guid: [0; 16],
1314 name: "foo".to_string(),
1315 flags: 0,
1316 })))
1317 }
1318
1319 async fn read(
1320 &self,
1321 _device_block_offset: u64,
1322 _block_count: u32,
1323 _vmo: &Arc<zx::Vmo>,
1324 _vmo_offset: u64,
1325 _trace_flow_id: Option<NonZero<u64>>,
1326 ) -> Result<(), zx::Status> {
1327 unreachable!();
1328 }
1329
1330 fn barrier(&self) -> Result<(), zx::Status> {
1331 unreachable!()
1332 }
1333
1334 async fn write(
1335 &self,
1336 _device_block_offset: u64,
1337 _block_count: u32,
1338 _vmo: &Arc<zx::Vmo>,
1339 _vmo_offset: u64,
1340 _opts: WriteOptions,
1341 _trace_flow_id: Option<NonZero<u64>>,
1342 ) -> Result<(), zx::Status> {
1343 unreachable!();
1344 }
1345
1346 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1347 self.flush_called.store(true, Ordering::Relaxed);
1348 Ok(())
1349 }
1350
1351 async fn trim(
1352 &self,
1353 _device_block_offset: u64,
1354 _block_count: u32,
1355 _trace_flow_id: Option<NonZero<u64>>,
1356 ) -> Result<(), zx::Status> {
1357 unreachable!();
1358 }
1359 }
1360
1361 let flush_called = Arc::new(AtomicBool::new(false));
1362
1363 futures::join!(
1364 async {
1365 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1366
1367 block_client.flush().await.expect("flush failed");
1368 },
1369 async {
1370 let block_server = BlockServer::new(
1371 512,
1372 Arc::new(Interface { flush_called: flush_called.clone() }),
1373 );
1374 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1375 }
1376 );
1377
1378 assert!(flush_called.load(Ordering::Relaxed));
1379 }
1380
1381 #[fuchsia::test]
1382 async fn test_trace_flow_ids_set() {
1383 let (proxy, server) = fidl::endpoints::create_proxy();
1384
1385 futures::join!(
1386 async {
1387 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1388 block_client.flush().await.expect("flush failed");
1389 },
1390 async {
1391 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1392 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1393 if request.trace_flow_id > 0 {
1394 *flow_id.lock() = Some(request.trace_flow_id);
1395 }
1396 BlockFifoResponse {
1397 status: zx::Status::OK.into_raw(),
1398 reqid: request.reqid,
1399 ..Default::default()
1400 }
1401 };
1402 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1403 assert!(flow_id.lock().is_some());
1405 }
1406 );
1407 }
1408}