[NFSD]
[reactos.git] / reactos / base / services / nfsd / pnfs_io.c
1 /* NFSv4.1 client for Windows
2 * Copyright © 2012 The Regents of the University of Michigan
3 *
4 * Olga Kornievskaia <aglo@umich.edu>
5 * Casey Bodley <cbodley@umich.edu>
6 *
7 * This library is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation; either version 2.1 of the License, or (at
10 * your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful, but
13 * without any warranty; without even the implied warranty of merchantability
14 * or fitness for a particular purpose. See the GNU Lesser General Public
15 * License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with this library; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20 */
21
22 #include <stdio.h>
23 #include <process.h>
24
25 #include "nfs41_ops.h"
26 #include "util.h"
27 #include "daemon_debug.h"
28
29
30 #define IOLVL 2 /* dprintf level for pnfs io logging */
31
32 #define file_layout_entry(pos) list_container(pos, pnfs_file_layout, layout.entry)
33
34 typedef struct __pnfs_io_pattern {
35 struct __pnfs_io_thread *threads;
36 nfs41_root *root;
37 nfs41_path_fh *meta_file;
38 const stateid_arg *stateid;
39 pnfs_layout_state *state;
40 unsigned char *buffer;
41 uint64_t offset_start;
42 uint64_t offset_end;
43 uint32_t count;
44 uint32_t default_lease;
45 } pnfs_io_pattern;
46
47 typedef struct __pnfs_io_thread {
48 nfs41_write_verf verf;
49 pnfs_io_pattern *pattern;
50 pnfs_file_layout *layout;
51 nfs41_path_fh *file;
52 uint64_t offset;
53 uint32_t id;
54 enum stable_how4 stable;
55 } pnfs_io_thread;
56
57 typedef struct __pnfs_io_unit {
58 unsigned char *buffer;
59 uint64_t offset;
60 uint64_t length;
61 uint32_t stripeid;
62 uint32_t serverid;
63 } pnfs_io_unit;
64
65 typedef uint32_t (WINAPI *pnfs_io_thread_fn)(void*);
66
67
68 static enum pnfs_status stripe_next_unit(
69 IN const pnfs_file_layout *layout,
70 IN uint32_t stripeid,
71 IN uint64_t *position,
72 IN uint64_t offset_end,
73 OUT pnfs_io_unit *io);
74
75 /* 13.4.2. Interpreting the File Layout Using Sparse Packing
76 * http://tools.ietf.org/html/rfc5661#section-13.4.2 */
77
78 static enum pnfs_status get_sparse_fh(
79 IN pnfs_file_layout *layout,
80 IN nfs41_path_fh *meta_file,
81 IN uint32_t stripeid,
82 OUT nfs41_path_fh **file_out)
83 {
84 const uint32_t filehandle_count = layout->filehandles.count;
85 const uint32_t server_count = layout->device->servers.count;
86 enum pnfs_status status = PNFS_SUCCESS;
87
88 if (filehandle_count == server_count) {
89 const uint32_t serverid = data_server_index(layout->device, stripeid);
90 *file_out = &layout->filehandles.arr[serverid];
91 } else if (filehandle_count == 1) {
92 *file_out = &layout->filehandles.arr[0];
93 } else if (filehandle_count == 0) {
94 *file_out = meta_file;
95 } else {
96 eprintf("invalid sparse layout! has %u file handles "
97 "and %u servers\n", filehandle_count, server_count);
98 status = PNFSERR_INVALID_FH_LIST;
99 }
100 return status;
101 }
102
103 /* 13.4.3. Interpreting the File Layout Using Dense Packing
104 * http://tools.ietf.org/html/rfc5661#section-13.4.3 */
105
106 static enum pnfs_status get_dense_fh(
107 IN pnfs_file_layout *layout,
108 IN uint32_t stripeid,
109 OUT nfs41_path_fh **file_out)
110 {
111 const uint32_t filehandle_count = layout->filehandles.count;
112 const uint32_t stripe_count = layout->device->stripes.count;
113 enum pnfs_status status = PNFS_SUCCESS;
114
115 if (filehandle_count == stripe_count) {
116 *file_out = &layout->filehandles.arr[stripeid];
117 } else {
118 eprintf("invalid dense layout! has %u file handles "
119 "and %u stripes\n", filehandle_count, stripe_count);
120 status = PNFSERR_INVALID_FH_LIST;
121 }
122 return status;
123 }
124
125 static __inline bool_t layout_compatible(
126 IN const pnfs_layout *layout,
127 IN enum pnfs_iomode iomode,
128 IN uint64_t position)
129 {
130 return layout->iomode >= iomode
131 && layout->offset <= position
132 && position < layout->offset + layout->length;
133 }
134
135 /* count stripes for all layout segments that intersect the range
136 * and have not been covered by previous segments */
137 static uint32_t thread_count(
138 IN pnfs_layout_state *state,
139 IN enum pnfs_iomode iomode,
140 IN uint64_t offset,
141 IN uint64_t length)
142 {
143 uint64_t position = offset;
144 struct list_entry *entry;
145 uint32_t count = 0;
146
147 list_for_each(entry, &state->layouts) {
148 pnfs_file_layout *layout = file_layout_entry(entry);
149
150 if (!layout_compatible(&layout->layout, iomode, position))
151 continue;
152
153 position = layout->layout.offset + layout->layout.length;
154 count += layout->device->stripes.count;
155 }
156 return count;
157 }
158
159 static enum pnfs_status thread_init(
160 IN pnfs_io_pattern *pattern,
161 IN pnfs_io_thread *thread,
162 IN pnfs_file_layout *layout,
163 IN uint32_t stripeid,
164 IN uint64_t offset)
165 {
166 thread->pattern = pattern;
167 thread->layout = layout;
168 thread->stable = FILE_SYNC4;
169 thread->offset = offset;
170 thread->id = stripeid;
171
172 return is_dense(layout) ? get_dense_fh(layout, stripeid, &thread->file)
173 : get_sparse_fh(layout, pattern->meta_file, stripeid, &thread->file);
174 }
175
176 static enum pnfs_status pattern_threads_init(
177 IN pnfs_io_pattern *pattern,
178 IN enum pnfs_iomode iomode,
179 IN uint64_t offset,
180 IN uint64_t length)
181 {
182 pnfs_io_unit io;
183 uint64_t position = offset;
184 struct list_entry *entry;
185 uint32_t s, t = 0;
186 enum pnfs_status status = PNFS_SUCCESS;
187
188 list_for_each(entry, &pattern->state->layouts) {
189 pnfs_file_layout *layout = file_layout_entry(entry);
190
191 if (!layout_compatible(&layout->layout, iomode, position))
192 continue;
193
194 for (s = 0; s < layout->device->stripes.count; s++) {
195 uint64_t off = position;
196
197 /* does the range contain this stripe? */
198 status = stripe_next_unit(layout, s, &off, offset + length, &io);
199 if (status != PNFS_PENDING)
200 continue;
201
202 if (t >= pattern->count) { /* miscounted threads needed? */
203 status = PNFSERR_NO_LAYOUT;
204 goto out;
205 }
206
207 status = thread_init(pattern, &pattern->threads[t++], layout, s, off);
208 if (status)
209 goto out;
210 }
211 position = layout->layout.offset + layout->layout.length;
212 }
213
214 if (position < offset + length) {
215 /* unable to satisfy the entire range */
216 status = PNFSERR_NO_LAYOUT;
217 goto out;
218 }
219
220 /* update the pattern with the actual number of threads used */
221 pattern->count = t;
222 out:
223 return status;
224 }
225
226 static enum pnfs_status pattern_init(
227 IN pnfs_io_pattern *pattern,
228 IN nfs41_root *root,
229 IN nfs41_path_fh *meta_file,
230 IN const stateid_arg *stateid,
231 IN pnfs_layout_state *state,
232 IN unsigned char *buffer,
233 IN enum pnfs_iomode iomode,
234 IN uint64_t offset,
235 IN uint64_t length,
236 IN uint32_t default_lease)
237 {
238 enum pnfs_status status;
239
240 /* calculate an upper bound on the number of threads to allocate */
241 pattern->count = thread_count(state, iomode, offset, length);
242 pattern->threads = calloc(pattern->count, sizeof(pnfs_io_thread));
243 if (pattern->threads == NULL) {
244 status = PNFSERR_RESOURCES;
245 goto out;
246 }
247
248 /* information shared between threads */
249 pattern->root = root;
250 pattern->meta_file = meta_file;
251 pattern->stateid = stateid;
252 pattern->state = state;
253 pattern->buffer = buffer;
254 pattern->offset_start = offset;
255 pattern->offset_end = offset + length;
256 pattern->default_lease = default_lease;
257
258 /* initialize a thread for every stripe necessary to cover the range */
259 status = pattern_threads_init(pattern, iomode, offset, length);
260 if (status)
261 goto out_err_free;
262
263 /* take a reference on the layout so we don't return it during io */
264 pnfs_layout_io_start(state);
265 out:
266 return status;
267
268 out_err_free:
269 free(pattern->threads);
270 pattern->threads = NULL;
271 goto out;
272 }
273
274 static void pattern_free(
275 IN pnfs_io_pattern *pattern)
276 {
277 /* inform the layout that our io is finished */
278 pnfs_layout_io_finished(pattern->state);
279 free(pattern->threads);
280 }
281
282 static __inline uint64_t positive_remainder(
283 IN uint64_t dividend,
284 IN uint32_t divisor)
285 {
286 const uint64_t remainder = dividend % divisor;
287 return remainder < divisor ? remainder : remainder + divisor;
288 }
289
290 /* return the next unit of the given stripeid */
291 static enum pnfs_status stripe_next_unit(
292 IN const pnfs_file_layout *layout,
293 IN uint32_t stripeid,
294 IN uint64_t *position,
295 IN uint64_t offset_end,
296 OUT pnfs_io_unit *io)
297 {
298 const uint32_t unit_size = layout_unit_size(layout);
299 const uint32_t stripe_count = layout->device->stripes.count;
300 uint64_t sui = stripe_unit_number(layout, *position, unit_size);
301
302 /* advance to the desired stripeid */
303 sui += abs(stripeid - stripe_index(layout, sui, stripe_count));
304
305 io->offset = stripe_unit_offset(layout, sui, unit_size);
306 if (io->offset < *position) /* don't start before position */
307 io->offset = *position;
308 else
309 *position = io->offset;
310
311 io->length = stripe_unit_offset(layout, sui + 1, unit_size);
312 if (io->length > offset_end) /* don't end past offset_end */
313 io->length = offset_end;
314
315 if (io->offset >= io->length) /* nothing to do, return success */
316 return PNFS_SUCCESS;
317
318 io->length -= io->offset;
319
320 if (is_dense(layout)) {
321 const uint64_t rel_offset = io->offset - layout->pattern_offset;
322 const uint64_t remainder = positive_remainder(rel_offset, unit_size);
323 const uint32_t stride = unit_size * stripe_count;
324
325 io->offset = (rel_offset / stride) * unit_size + remainder;
326 }
327 return PNFS_PENDING;
328 }
329
330 static enum pnfs_status thread_next_unit(
331 IN pnfs_io_thread *thread,
332 OUT pnfs_io_unit *io)
333 {
334 pnfs_io_pattern *pattern = thread->pattern;
335 pnfs_layout_state *state = pattern->state;
336 enum pnfs_status status;
337
338 AcquireSRWLockShared(&state->lock);
339
340 /* stop io if the layout is recalled */
341 status = pnfs_layout_recall_status(state, &thread->layout->layout);
342 if (status)
343 goto out_unlock;
344
345 status = stripe_next_unit(thread->layout, thread->id,
346 &thread->offset, pattern->offset_end, io);
347 if (status == PNFS_PENDING)
348 io->buffer = pattern->buffer + thread->offset - pattern->offset_start;
349
350 out_unlock:
351 ReleaseSRWLockShared(&state->lock);
352 return status;
353 }
354
355 static enum pnfs_status thread_data_server(
356 IN pnfs_io_thread *thread,
357 OUT pnfs_data_server **server_out)
358 {
359 pnfs_file_device *device = thread->layout->device;
360 const uint32_t serverid = data_server_index(device, thread->id);
361
362 if (serverid >= device->servers.count)
363 return PNFSERR_INVALID_DS_INDEX;
364
365 *server_out = &device->servers.arr[serverid];
366 return PNFS_SUCCESS;
367 }
368
369 static enum pnfs_status pattern_join(
370 IN HANDLE *threads,
371 IN DWORD count)
372 {
373 DWORD status;
374 /* WaitForMultipleObjects() supports a maximum of 64 objects */
375 while (count) {
376 const DWORD n = min(count, MAXIMUM_WAIT_OBJECTS);
377 status = WaitForMultipleObjects(n, threads, TRUE, INFINITE);
378 if (status != WAIT_OBJECT_0)
379 return PNFSERR_RESOURCES;
380
381 count -= n;
382 threads += n;
383 }
384 return PNFS_SUCCESS;
385 }
386
387 static enum pnfs_status pattern_fork(
388 IN pnfs_io_pattern *pattern,
389 IN pnfs_io_thread_fn thread_fn)
390 {
391 HANDLE *threads;
392 uint32_t i;
393 enum pnfs_status status = PNFS_SUCCESS;
394
395 if (pattern->count == 0)
396 goto out;
397
398 if (pattern->count == 1) {
399 /* no need to fork if there's only 1 thread */
400 status = (enum pnfs_status)thread_fn(pattern->threads);
401 goto out;
402 }
403
404 /* create a thread for each unit that has actual io */
405 threads = calloc(pattern->count, sizeof(HANDLE));
406 if (threads == NULL) {
407 status = PNFSERR_RESOURCES;
408 goto out;
409 }
410
411 for (i = 0; i < pattern->count; i++) {
412 threads[i] = (HANDLE)_beginthreadex(NULL, 0,
413 thread_fn, &pattern->threads[i], 0, NULL);
414 if (threads[i] == NULL) {
415 eprintf("_beginthreadex() failed with %d\n", GetLastError());
416 pattern->count = i; /* join any threads already started */
417 break;
418 }
419 }
420
421 /* wait on all threads to finish */
422 status = pattern_join(threads, pattern->count);
423 if (status) {
424 eprintf("pattern_join() failed with %s\n", pnfs_error_string(status));
425 goto out;
426 }
427
428 for (i = 0; i < pattern->count; i++) {
429 /* keep track of the most severe error returned by a thread */
430 DWORD exitcode;
431 if (GetExitCodeThread(threads[i], &exitcode))
432 status = max(status, (enum pnfs_status)exitcode);
433
434 CloseHandle(threads[i]);
435 }
436
437 free(threads);
438 out:
439 return status;
440 }
441
442 static uint64_t pattern_bytes_transferred(
443 IN pnfs_io_pattern *pattern,
444 OUT OPTIONAL enum stable_how4 *stable)
445 {
446 uint64_t lowest_offset = pattern->offset_end;
447 uint32_t i;
448
449 if (stable) *stable = FILE_SYNC4;
450
451 for (i = 0; i < pattern->count; i++) {
452 lowest_offset = min(lowest_offset, pattern->threads[i].offset);
453 if (stable) *stable = min(*stable, pattern->threads[i].stable);
454 }
455 return lowest_offset - pattern->offset_start;
456 }
457
458
459 static enum pnfs_status map_ds_error(
460 IN enum nfsstat4 nfsstat,
461 IN pnfs_layout_state *state,
462 IN const pnfs_file_layout *layout)
463 {
464 switch (nfsstat) {
465 case NO_ERROR:
466 return PNFS_SUCCESS;
467
468 /* 13.11 Layout Revocation and Fencing
469 * http://tools.ietf.org/html/rfc5661#section-13.11
470 * if we've been fenced, we'll either get ERR_STALE when we PUTFH
471 * something in layout.filehandles, or ERR_PNFS_NO_LAYOUT when
472 * attempting to READ or WRITE */
473 case NFS4ERR_STALE:
474 case NFS4ERR_PNFS_NO_LAYOUT:
475 dprintf(IOLVL, "data server fencing detected!\n");
476
477 pnfs_layout_recall_fenced(state, &layout->layout);
478
479 /* return CHANGED to prevent any further use of the layout */
480 return PNFSERR_LAYOUT_CHANGED;
481
482 default:
483 return PNFSERR_IO;
484 }
485 }
486
487 static uint32_t WINAPI file_layout_read_thread(void *args)
488 {
489 pnfs_io_unit io;
490 stateid_arg stateid;
491 pnfs_io_thread *thread = (pnfs_io_thread*)args;
492 pnfs_io_pattern *pattern = thread->pattern;
493 pnfs_data_server *server;
494 nfs41_client *client;
495 uint32_t maxreadsize, bytes_read, total_read;
496 enum pnfs_status status;
497 enum nfsstat4 nfsstat;
498 bool_t eof;
499
500 dprintf(IOLVL, "--> file_layout_read_thread(%u)\n", thread->id);
501
502 /* get the data server for this thread */
503 status = thread_data_server(thread, &server);
504 if (status) {
505 eprintf("thread_data_server() failed with %s\n",
506 pnfs_error_string(status));
507 goto out;
508 }
509 /* find or establish a client for this data server */
510 status = pnfs_data_server_client(pattern->root,
511 server, pattern->default_lease, &client);
512 if (status) {
513 eprintf("pnfs_data_server_client() failed with %s\n",
514 pnfs_error_string(status));
515 goto out;
516 }
517
518 memcpy(&stateid, pattern->stateid, sizeof(stateid));
519 stateid.stateid.seqid = 0;
520
521 total_read = 0;
522 while (thread_next_unit(thread, &io) == PNFS_PENDING) {
523 maxreadsize = max_read_size(client->session, &thread->file->fh);
524 if (io.length > maxreadsize)
525 io.length = maxreadsize;
526
527 nfsstat = nfs41_read(client->session, thread->file, &stateid,
528 io.offset, (uint32_t)io.length, io.buffer, &bytes_read, &eof);
529 if (nfsstat) {
530 eprintf("nfs41_read() failed with %s\n",
531 nfs_error_string(nfsstat));
532 status = map_ds_error(nfsstat, pattern->state, thread->layout);
533 break;
534 }
535
536 total_read += bytes_read;
537 thread->offset += bytes_read;
538
539 if (eof) {
540 dprintf(IOLVL, "read thread %u reached eof: offset %llu\n",
541 thread->id, thread->offset);
542 status = total_read ? PNFS_SUCCESS : PNFS_READ_EOF;
543 break;
544 }
545 }
546 out:
547 dprintf(IOLVL, "<-- file_layout_read_thread(%u) returning %s\n",
548 thread->id, pnfs_error_string(status));
549 return status;
550 }
551
552 static uint32_t WINAPI file_layout_write_thread(void *args)
553 {
554 pnfs_io_unit io;
555 stateid_arg stateid;
556 pnfs_io_thread *thread = (pnfs_io_thread*)args;
557 pnfs_io_pattern *pattern = thread->pattern;
558 pnfs_data_server *server;
559 nfs41_client *client;
560 const uint64_t offset_start = thread->offset;
561 uint64_t commit_min, commit_max;
562 uint32_t maxwritesize, bytes_written, total_written;
563 enum pnfs_status status;
564 enum nfsstat4 nfsstat;
565
566 dprintf(IOLVL, "--> file_layout_write_thread(%u)\n", thread->id);
567
568 /* get the data server for this thread */
569 status = thread_data_server(thread, &server);
570 if (status) {
571 eprintf("thread_data_server() failed with %s\n",
572 pnfs_error_string(status));
573 goto out;
574 }
575 /* find or establish a client for this data server */
576 status = pnfs_data_server_client(pattern->root,
577 server, pattern->default_lease, &client);
578 if (status) {
579 eprintf("pnfs_data_server_client() failed with %s\n",
580 pnfs_error_string(status));
581 goto out;
582 }
583
584 memcpy(&stateid, pattern->stateid, sizeof(stateid));
585 stateid.stateid.seqid = 0;
586
587 maxwritesize = max_write_size(client->session, &thread->file->fh);
588
589 retry_write:
590 thread->offset = offset_start;
591 thread->stable = FILE_SYNC4;
592 commit_min = NFS4_UINT64_MAX;
593 commit_max = 0;
594 total_written = 0;
595
596 while (thread_next_unit(thread, &io) == PNFS_PENDING) {
597 if (io.length > maxwritesize)
598 io.length = maxwritesize;
599
600 nfsstat = nfs41_write(client->session, thread->file, &stateid,
601 io.buffer, (uint32_t)io.length, io.offset, UNSTABLE4,
602 &bytes_written, &thread->verf, NULL);
603 if (nfsstat) {
604 eprintf("nfs41_write() failed with %s\n",
605 nfs_error_string(nfsstat));
606 status = map_ds_error(nfsstat, pattern->state, thread->layout);
607 break;
608 }
609 if (!verify_write(&thread->verf, &thread->stable))
610 goto retry_write;
611
612 total_written += bytes_written;
613 thread->offset += bytes_written;
614
615 /* track the range for commit */
616 if (commit_min > io.offset)
617 commit_min = io.offset;
618 if (commit_max < io.offset + io.length)
619 commit_max = io.offset + io.length;
620 }
621
622 /* nothing to commit */
623 if (commit_max <= commit_min)
624 goto out;
625 /* layout changed; redo all io against metadata server */
626 if (status == PNFSERR_LAYOUT_CHANGED)
627 goto out;
628 /* the data is already in stable storage */
629 if (thread->stable != UNSTABLE4)
630 goto out;
631 /* the metadata server expects us to commit there instead */
632 if (should_commit_to_mds(thread->layout))
633 goto out;
634
635 dprintf(1, "sending COMMIT to data server for offset=%lld len=%lld\n",
636 commit_min, commit_max - commit_min);
637 nfsstat = nfs41_commit(client->session, thread->file,
638 commit_min, (uint32_t)(commit_max - commit_min), 0, &thread->verf, NULL);
639
640 if (nfsstat)
641 status = map_ds_error(nfsstat, pattern->state, thread->layout);
642 else if (!verify_commit(&thread->verf)) {
643 /* resend the writes unless the layout was recalled */
644 if (status != PNFSERR_LAYOUT_RECALLED)
645 goto retry_write;
646 status = PNFSERR_IO;
647 } else {
648 /* on successful commit, leave pnfs_status unchanged; if the
649 * layout was recalled, we still want to return the error */
650 thread->stable = DATA_SYNC4;
651 }
652 out:
653 dprintf(IOLVL, "<-- file_layout_write_thread(%u) returning %s\n",
654 thread->id, pnfs_error_string(status));
655 return status;
656 }
657
658
659 enum pnfs_status pnfs_read(
660 IN nfs41_root *root,
661 IN nfs41_open_state *state,
662 IN stateid_arg *stateid,
663 IN pnfs_layout_state *layout,
664 IN uint64_t offset,
665 IN uint64_t length,
666 OUT unsigned char *buffer_out,
667 OUT ULONG *len_out)
668 {
669 pnfs_io_pattern pattern;
670 enum pnfs_status status;
671
672 dprintf(IOLVL, "--> pnfs_read(%llu, %llu)\n", offset, length);
673
674 *len_out = 0;
675
676 AcquireSRWLockExclusive(&layout->lock);
677
678 /* get layouts/devices for the entire range; PNFS_PENDING means we
679 * dropped the lock to send an rpc, so repeat until it succeeds */
680 do {
681 status = pnfs_layout_state_prepare(layout, state->session,
682 &state->file, stateid, PNFS_IOMODE_READ, offset, length);
683 } while (status == PNFS_PENDING);
684
685 if (status == PNFS_SUCCESS) {
686 /* interpret the layout and set up threads for io */
687 status = pattern_init(&pattern, root, &state->file, stateid,
688 layout, buffer_out, PNFS_IOMODE_READ, offset, length,
689 state->session->lease_time);
690 if (status)
691 eprintf("pattern_init() failed with %s\n",
692 pnfs_error_string(status));
693 }
694
695 ReleaseSRWLockExclusive(&layout->lock);
696
697 if (status)
698 goto out;
699
700 status = pattern_fork(&pattern, file_layout_read_thread);
701 if (status != PNFS_SUCCESS && status != PNFS_READ_EOF)
702 goto out_free_pattern;
703
704 *len_out = (ULONG)pattern_bytes_transferred(&pattern, NULL);
705
706 out_free_pattern:
707 pattern_free(&pattern);
708 out:
709 dprintf(IOLVL, "<-- pnfs_read() returning %s\n",
710 pnfs_error_string(status));
711 return status;
712 }
713
714 static enum pnfs_status mds_commit(
715 IN nfs41_open_state *state,
716 IN uint64_t offset,
717 IN uint32_t length,
718 IN const pnfs_io_pattern *pattern,
719 OUT nfs41_file_info *info)
720 {
721 nfs41_write_verf verf;
722 enum nfsstat4 nfsstat;
723 enum pnfs_status status = PNFS_SUCCESS;
724 uint32_t i;
725
726 nfsstat = nfs41_commit(state->session,
727 &state->file, offset, length, 1, &verf, info);
728 if (nfsstat) {
729 eprintf("nfs41_commit() to mds failed with %s\n",
730 nfs_error_string(nfsstat));
731 status = PNFSERR_IO;
732 goto out;
733 }
734
735 /* 13.7. COMMIT through Metadata Server:
736 * If nfl_util & NFL4_UFLG_COMMIT_THRU_MDS is TRUE, then in order to
737 * maintain the current NFSv4.1 commit and recovery model, the data
738 * servers MUST return a common writeverf verifier in all WRITE
739 * responses for a given file layout, and the metadata server's
740 * COMMIT implementation must return the same writeverf. */
741 for (i = 0; i < pattern->count; i++) {
742 const pnfs_io_thread *thread = &pattern->threads[i];
743 if (thread->stable != UNSTABLE4) /* already committed */
744 continue;
745
746 if (!should_commit_to_mds(thread->layout)) {
747 /* commit to mds is not allowed on this layout */
748 eprintf("mds commit: failed to commit to data server\n");
749 status = PNFSERR_IO;
750 break;
751 }
752 if (memcmp(verf.verf, thread->verf.verf, NFS4_VERIFIER_SIZE) != 0) {
753 eprintf("mds commit verifier doesn't match ds write verifiers\n");
754 status = PNFSERR_IO;
755 break;
756 }
757 }
758 out:
759 return status;
760 }
761
762 static enum pnfs_status layout_commit(
763 IN nfs41_open_state *state,
764 IN pnfs_layout_state *layout,
765 IN uint64_t offset,
766 IN uint64_t length,
767 OUT nfs41_file_info *info)
768 {
769 stateid4 layout_stateid;
770 uint64_t last_offset = offset + length - 1;
771 uint64_t *new_last_offset = NULL;
772 enum nfsstat4 nfsstat;
773 enum pnfs_status status = PNFS_SUCCESS;
774
775 AcquireSRWLockExclusive(&state->lock);
776 /* if this is past the current eof, update the open state's
777 * last offset, and pass a pointer to LAYOUTCOMMIT */
778 if (state->pnfs_last_offset < last_offset ||
779 (state->pnfs_last_offset == 0 && last_offset == 0)) {
780 state->pnfs_last_offset = last_offset;
781 new_last_offset = &last_offset;
782 }
783 ReleaseSRWLockExclusive(&state->lock);
784
785 AcquireSRWLockShared(&layout->lock);
786 memcpy(&layout_stateid, &layout->stateid, sizeof(layout_stateid));
787 ReleaseSRWLockShared(&layout->lock);
788
789 dprintf(1, "LAYOUTCOMMIT for offset=%lld len=%lld new_last_offset=%u\n",
790 offset, length, new_last_offset ? 1 : 0);
791 nfsstat = pnfs_rpc_layoutcommit(state->session, &state->file,
792 &layout_stateid, offset, length, new_last_offset, NULL, info);
793 if (nfsstat) {
794 dprintf(IOLVL, "pnfs_rpc_layoutcommit() failed with %s\n",
795 nfs_error_string(nfsstat));
796 status = PNFSERR_IO;
797 }
798 return status;
799 }
800
801 enum pnfs_status pnfs_write(
802 IN nfs41_root *root,
803 IN nfs41_open_state *state,
804 IN stateid_arg *stateid,
805 IN pnfs_layout_state *layout,
806 IN uint64_t offset,
807 IN uint64_t length,
808 IN unsigned char *buffer,
809 OUT ULONG *len_out,
810 OUT nfs41_file_info *info)
811 {
812 pnfs_io_pattern pattern;
813 enum stable_how4 stable;
814 enum pnfs_status status;
815
816 dprintf(IOLVL, "--> pnfs_write(%llu, %llu)\n", offset, length);
817
818 *len_out = 0;
819
820 AcquireSRWLockExclusive(&layout->lock);
821
822 /* get layouts/devices for the entire range; PNFS_PENDING means we
823 * dropped the lock to send an rpc, so repeat until it succeeds */
824 do {
825 status = pnfs_layout_state_prepare(layout, state->session,
826 &state->file, stateid, PNFS_IOMODE_RW, offset, length);
827 } while (status == PNFS_PENDING);
828
829 if (status == PNFS_SUCCESS) {
830 /* interpret the layout and set up threads for io */
831 status = pattern_init(&pattern, root, &state->file, stateid,
832 layout, buffer, PNFS_IOMODE_RW, offset, length,
833 state->session->lease_time);
834 if (status)
835 eprintf("pattern_init() failed with %s\n",
836 pnfs_error_string(status));
837 }
838
839 ReleaseSRWLockExclusive(&layout->lock);
840
841 if (status)
842 goto out;
843
844 status = pattern_fork(&pattern, file_layout_write_thread);
845 /* on layout recall, we still attempt to commit what we wrote */
846 if (status != PNFS_SUCCESS && status != PNFSERR_LAYOUT_RECALLED)
847 goto out_free_pattern;
848
849 *len_out = (ULONG)pattern_bytes_transferred(&pattern, &stable);
850 if (*len_out == 0)
851 goto out_free_pattern;
852
853 if (stable == UNSTABLE4) {
854 /* send COMMIT to the mds and verify against all ds writes */
855 status = mds_commit(state, offset, *len_out, &pattern, info);
856 } else if (stable == DATA_SYNC4) {
857 /* send LAYOUTCOMMIT to sync the metadata */
858 status = layout_commit(state, layout, offset, *len_out, info);
859 } else {
860 /* send a GETATTR to update the cached size */
861 bitmap4 attr_request;
862 nfs41_superblock_getattr_mask(state->file.fh.superblock, &attr_request);
863 nfs41_getattr(state->session, &state->file, &attr_request, info);
864 }
865 out_free_pattern:
866 pattern_free(&pattern);
867 out:
868 dprintf(IOLVL, "<-- pnfs_write() returning %s\n",
869 pnfs_error_string(status));
870 return status;
871 }