blob: cba3d0278b86a141d0707b64234c68d2ec5a47c4 [file] [log] [blame]
lh9ed821d2023-04-07 01:36:19 -07001/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
58#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59#define RBD_MAX_POOL_NAME_LEN 64
60#define RBD_MAX_SNAP_NAME_LEN 32
61#define RBD_MAX_OPT_LEN 1024
62
63#define RBD_SNAP_HEAD_NAME "-"
64
65/*
66 * An RBD device name will be "rbd#", where the "rbd" comes from
67 * RBD_DRV_NAME above, and # is a unique integer identifier.
68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69 * enough to hold all possible device names.
70 */
71#define DEV_NAME_LEN 32
72#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
73
74#define RBD_READ_ONLY_DEFAULT false
75
76/*
77 * block device image metadata (in-memory version)
78 */
79struct rbd_image_header {
80 u64 image_size;
81 char block_name[32];
82 __u8 obj_order;
83 __u8 crypt_type;
84 __u8 comp_type;
85 struct ceph_snap_context *snapc;
86 size_t snap_names_len;
87 u64 snap_seq;
88 u32 total_snaps;
89
90 char *snap_names;
91 u64 *snap_sizes;
92
93 u64 obj_version;
94};
95
96struct rbd_options {
97 bool read_only;
98};
99
100/*
101 * an instance of the client. multiple devices may share an rbd client.
102 */
103struct rbd_client {
104 struct ceph_client *client;
105 struct rbd_options *rbd_opts;
106 struct kref kref;
107 struct list_head node;
108};
109
110/*
111 * a request completion status
112 */
113struct rbd_req_status {
114 int done;
115 int rc;
116 u64 bytes;
117};
118
119/*
120 * a collection of requests
121 */
122struct rbd_req_coll {
123 int total;
124 int num_done;
125 struct kref kref;
126 struct rbd_req_status status[0];
127};
128
129/*
130 * a single io request
131 */
132struct rbd_request {
133 struct request *rq; /* blk layer request */
134 struct bio *bio; /* cloned bio */
135 struct page **pages; /* list of used pages */
136 u64 len;
137 int coll_index;
138 struct rbd_req_coll *coll;
139};
140
141struct rbd_snap {
142 struct device dev;
143 const char *name;
144 size_t size;
145 struct list_head node;
146 u64 id;
147};
148
149/*
150 * a single device
151 */
152struct rbd_device {
153 int id; /* blkdev unique id */
154
155 int major; /* blkdev assigned major */
156 struct gendisk *disk; /* blkdev's gendisk and rq */
157 struct request_queue *q;
158
159 struct rbd_client *rbd_client;
160
161 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
162
163 spinlock_t lock; /* queue lock */
164
165 struct rbd_image_header header;
166 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
167 int obj_len;
168 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
169 char pool_name[RBD_MAX_POOL_NAME_LEN];
170 int poolid;
171
172 struct ceph_osd_event *watch_event;
173 struct ceph_osd_request *watch_request;
174
175 /* protects updating the header */
176 struct rw_semaphore header_rwsem;
177 /* name of the snapshot this device reads from */
178 char snap_name[RBD_MAX_SNAP_NAME_LEN];
179 /* id of the snapshot this device reads from */
180 u64 snap_id; /* current snapshot id */
181 /* whether the snap_id this device reads from still exists */
182 bool snap_exists;
183 bool read_only;
184
185 struct list_head node;
186
187 /* list of snapshots */
188 struct list_head snaps;
189
190 /* sysfs related */
191 struct device dev;
192 unsigned long open_count;
193};
194
195static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
196
197static LIST_HEAD(rbd_dev_list); /* devices */
198static DEFINE_SPINLOCK(rbd_dev_list_lock);
199
200static LIST_HEAD(rbd_client_list); /* clients */
201static DEFINE_SPINLOCK(rbd_client_list_lock);
202
203static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
204static void rbd_dev_release(struct device *dev);
205static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
206 struct rbd_snap *snap);
207
208static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 size_t count);
210static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 size_t count);
212
213static struct bus_attribute rbd_bus_attrs[] = {
214 __ATTR(add, S_IWUSR, NULL, rbd_add),
215 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 __ATTR_NULL
217};
218
219static struct bus_type rbd_bus_type = {
220 .name = "rbd",
221 .bus_attrs = rbd_bus_attrs,
222};
223
224static void rbd_root_dev_release(struct device *dev)
225{
226}
227
228static struct device rbd_root_dev = {
229 .init_name = "rbd",
230 .release = rbd_root_dev_release,
231};
232
233
234static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235{
236 return get_device(&rbd_dev->dev);
237}
238
239static void rbd_put_dev(struct rbd_device *rbd_dev)
240{
241 put_device(&rbd_dev->dev);
242}
243
244static int __rbd_update_snaps(struct rbd_device *rbd_dev);
245
246static int rbd_open(struct block_device *bdev, fmode_t mode)
247{
248 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249
250 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
251 return -EROFS;
252
253 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
254 rbd_get_dev(rbd_dev);
255 set_device_ro(bdev, rbd_dev->read_only);
256 rbd_dev->open_count++;
257 mutex_unlock(&ctl_mutex);
258
259 return 0;
260}
261
262static int rbd_release(struct gendisk *disk, fmode_t mode)
263{
264 struct rbd_device *rbd_dev = disk->private_data;
265
266 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
267 BUG_ON(!rbd_dev->open_count);
268 rbd_dev->open_count--;
269 rbd_put_dev(rbd_dev);
270 mutex_unlock(&ctl_mutex);
271
272 return 0;
273}
274
275static const struct block_device_operations rbd_bd_ops = {
276 .owner = THIS_MODULE,
277 .open = rbd_open,
278 .release = rbd_release,
279};
280
281/*
282 * Initialize an rbd client instance.
283 * We own *opt.
284 */
285static struct rbd_client *rbd_client_create(struct ceph_options *opt,
286 struct rbd_options *rbd_opts)
287{
288 struct rbd_client *rbdc;
289 int ret = -ENOMEM;
290
291 dout("rbd_client_create\n");
292 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
293 if (!rbdc)
294 goto out_opt;
295
296 kref_init(&rbdc->kref);
297 INIT_LIST_HEAD(&rbdc->node);
298
299 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
300
301 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
302 if (IS_ERR(rbdc->client))
303 goto out_mutex;
304 opt = NULL; /* Now rbdc->client is responsible for opt */
305
306 ret = ceph_open_session(rbdc->client);
307 if (ret < 0)
308 goto out_err;
309
310 rbdc->rbd_opts = rbd_opts;
311
312 spin_lock(&rbd_client_list_lock);
313 list_add_tail(&rbdc->node, &rbd_client_list);
314 spin_unlock(&rbd_client_list_lock);
315
316 mutex_unlock(&ctl_mutex);
317
318 dout("rbd_client_create created %p\n", rbdc);
319 return rbdc;
320
321out_err:
322 ceph_destroy_client(rbdc->client);
323out_mutex:
324 mutex_unlock(&ctl_mutex);
325 kfree(rbdc);
326out_opt:
327 if (opt)
328 ceph_destroy_options(opt);
329 return ERR_PTR(ret);
330}
331
332/*
333 * Find a ceph client with specific addr and configuration.
334 */
335static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
336{
337 struct rbd_client *client_node;
338
339 if (opt->flags & CEPH_OPT_NOSHARE)
340 return NULL;
341
342 list_for_each_entry(client_node, &rbd_client_list, node)
343 if (ceph_compare_options(opt, client_node->client) == 0)
344 return client_node;
345 return NULL;
346}
347
348/*
349 * mount options
350 */
351enum {
352 Opt_last_int,
353 /* int args above */
354 Opt_last_string,
355 /* string args above */
356 Opt_read_only,
357 Opt_read_write,
358 /* Boolean args above */
359 Opt_last_bool,
360};
361
362static match_table_t rbdopt_tokens = {
363 /* int args above */
364 /* string args above */
365 {Opt_read_only, "read_only"},
366 {Opt_read_only, "ro"}, /* Alternate spelling */
367 {Opt_read_write, "read_write"},
368 {Opt_read_write, "rw"}, /* Alternate spelling */
369 /* Boolean args above */
370 {-1, NULL}
371};
372
373static int parse_rbd_opts_token(char *c, void *private)
374{
375 struct rbd_options *rbdopt = private;
376 substring_t argstr[MAX_OPT_ARGS];
377 int token, intval, ret;
378
379 token = match_token(c, rbdopt_tokens, argstr);
380 if (token < 0)
381 return -EINVAL;
382
383 if (token < Opt_last_int) {
384 ret = match_int(&argstr[0], &intval);
385 if (ret < 0) {
386 pr_err("bad mount option arg (not int) "
387 "at '%s'\n", c);
388 return ret;
389 }
390 dout("got int token %d val %d\n", token, intval);
391 } else if (token > Opt_last_int && token < Opt_last_string) {
392 dout("got string token %d val %s\n", token,
393 argstr[0].from);
394 } else if (token > Opt_last_string && token < Opt_last_bool) {
395 dout("got Boolean token %d\n", token);
396 } else {
397 dout("got token %d\n", token);
398 }
399
400 switch (token) {
401 case Opt_read_only:
402 rbdopt->read_only = true;
403 break;
404 case Opt_read_write:
405 rbdopt->read_only = false;
406 break;
407 default:
408 BUG_ON(token);
409 }
410 return 0;
411}
412
413/*
414 * Get a ceph client with specific addr and configuration, if one does
415 * not exist create it.
416 */
417static struct rbd_client *rbd_get_client(const char *mon_addr,
418 size_t mon_addr_len,
419 char *options)
420{
421 struct rbd_client *rbdc;
422 struct ceph_options *opt;
423 struct rbd_options *rbd_opts;
424
425 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
426 if (!rbd_opts)
427 return ERR_PTR(-ENOMEM);
428
429 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
430
431 opt = ceph_parse_options(options, mon_addr,
432 mon_addr + mon_addr_len,
433 parse_rbd_opts_token, rbd_opts);
434 if (IS_ERR(opt)) {
435 kfree(rbd_opts);
436 return ERR_CAST(opt);
437 }
438
439 spin_lock(&rbd_client_list_lock);
440 rbdc = __rbd_client_find(opt);
441 if (rbdc) {
442 /* using an existing client */
443 kref_get(&rbdc->kref);
444 spin_unlock(&rbd_client_list_lock);
445
446 ceph_destroy_options(opt);
447 kfree(rbd_opts);
448
449 return rbdc;
450 }
451 spin_unlock(&rbd_client_list_lock);
452
453 rbdc = rbd_client_create(opt, rbd_opts);
454
455 if (IS_ERR(rbdc))
456 kfree(rbd_opts);
457
458 return rbdc;
459}
460
461/*
462 * Destroy ceph client
463 *
464 * Caller must hold rbd_client_list_lock.
465 */
466static void rbd_client_release(struct kref *kref)
467{
468 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
469
470 dout("rbd_release_client %p\n", rbdc);
471 spin_lock(&rbd_client_list_lock);
472 list_del(&rbdc->node);
473 spin_unlock(&rbd_client_list_lock);
474
475 ceph_destroy_client(rbdc->client);
476 kfree(rbdc->rbd_opts);
477 kfree(rbdc);
478}
479
480/*
481 * Drop reference to ceph client node. If it's not referenced anymore, release
482 * it.
483 */
484static void rbd_put_client(struct rbd_device *rbd_dev)
485{
486 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
487 rbd_dev->rbd_client = NULL;
488}
489
490/*
491 * Destroy requests collection
492 */
493static void rbd_coll_release(struct kref *kref)
494{
495 struct rbd_req_coll *coll =
496 container_of(kref, struct rbd_req_coll, kref);
497
498 dout("rbd_coll_release %p\n", coll);
499 kfree(coll);
500}
501
502/*
503 * Create a new header structure, translate header format from the on-disk
504 * header.
505 */
506static int rbd_header_from_disk(struct rbd_image_header *header,
507 struct rbd_image_header_ondisk *ondisk,
508 int allocated_snaps,
509 gfp_t gfp_flags)
510{
511 int i;
512 u32 snap_count;
513
514 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
515 return -ENXIO;
516
517 snap_count = le32_to_cpu(ondisk->snap_count);
518 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
519 snap_count * sizeof(u64),
520 gfp_flags);
521 if (!header->snapc)
522 return -ENOMEM;
523
524 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
525 if (snap_count) {
526 header->snap_names = kmalloc(header->snap_names_len,
527 GFP_KERNEL);
528 if (!header->snap_names)
529 goto err_snapc;
530 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
531 GFP_KERNEL);
532 if (!header->snap_sizes)
533 goto err_names;
534 } else {
535 header->snap_names = NULL;
536 header->snap_sizes = NULL;
537 }
538 memcpy(header->block_name, ondisk->block_name,
539 sizeof(ondisk->block_name));
540
541 header->image_size = le64_to_cpu(ondisk->image_size);
542 header->obj_order = ondisk->options.order;
543 header->crypt_type = ondisk->options.crypt_type;
544 header->comp_type = ondisk->options.comp_type;
545
546 atomic_set(&header->snapc->nref, 1);
547 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
548 header->snapc->num_snaps = snap_count;
549 header->total_snaps = snap_count;
550
551 if (snap_count && allocated_snaps == snap_count) {
552 for (i = 0; i < snap_count; i++) {
553 header->snapc->snaps[i] =
554 le64_to_cpu(ondisk->snaps[i].id);
555 header->snap_sizes[i] =
556 le64_to_cpu(ondisk->snaps[i].image_size);
557 }
558
559 /* copy snapshot names */
560 memcpy(header->snap_names, &ondisk->snaps[i],
561 header->snap_names_len);
562 }
563
564 return 0;
565
566err_names:
567 kfree(header->snap_names);
568err_snapc:
569 kfree(header->snapc);
570 return -ENOMEM;
571}
572
573static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
574 u64 *seq, u64 *size)
575{
576 int i;
577 char *p = header->snap_names;
578
579 for (i = 0; i < header->total_snaps; i++) {
580 if (!strcmp(snap_name, p)) {
581
582 /* Found it. Pass back its id and/or size */
583
584 if (seq)
585 *seq = header->snapc->snaps[i];
586 if (size)
587 *size = header->snap_sizes[i];
588 return i;
589 }
590 p += strlen(p) + 1; /* Skip ahead to the next name */
591 }
592 return -ENOENT;
593}
594
595static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
596{
597 struct rbd_image_header *header = &dev->header;
598 struct ceph_snap_context *snapc = header->snapc;
599 int ret = -ENOENT;
600
601 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
602
603 down_write(&dev->header_rwsem);
604
605 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
606 sizeof (RBD_SNAP_HEAD_NAME))) {
607 if (header->total_snaps)
608 snapc->seq = header->snap_seq;
609 else
610 snapc->seq = 0;
611 dev->snap_id = CEPH_NOSNAP;
612 dev->snap_exists = false;
613 dev->read_only = dev->rbd_client->rbd_opts->read_only;
614 if (size)
615 *size = header->image_size;
616 } else {
617 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
618 if (ret < 0)
619 goto done;
620 dev->snap_id = snapc->seq;
621 dev->snap_exists = true;
622 dev->read_only = true; /* No choice for snapshots */
623 }
624
625 ret = 0;
626done:
627 up_write(&dev->header_rwsem);
628 return ret;
629}
630
631static void rbd_header_free(struct rbd_image_header *header)
632{
633 ceph_put_snap_context(header->snapc);
634 kfree(header->snap_names);
635 kfree(header->snap_sizes);
636}
637
638/*
639 * get the actual striped segment name, offset and length
640 */
641static u64 rbd_get_segment(struct rbd_image_header *header,
642 const char *block_name,
643 u64 ofs, u64 len,
644 char *seg_name, u64 *segofs)
645{
646 u64 seg = ofs >> header->obj_order;
647
648 if (seg_name)
649 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
650 "%s.%012llx", block_name, seg);
651
652 ofs = ofs & ((1 << header->obj_order) - 1);
653 len = min_t(u64, len, (1 << header->obj_order) - ofs);
654
655 if (segofs)
656 *segofs = ofs;
657
658 return len;
659}
660
661static int rbd_get_num_segments(struct rbd_image_header *header,
662 u64 ofs, u64 len)
663{
664 u64 start_seg = ofs >> header->obj_order;
665 u64 end_seg = (ofs + len - 1) >> header->obj_order;
666 return end_seg - start_seg + 1;
667}
668
669/*
670 * returns the size of an object in the image
671 */
672static u64 rbd_obj_bytes(struct rbd_image_header *header)
673{
674 return 1 << header->obj_order;
675}
676
677/*
678 * bio helpers
679 */
680
681static void bio_chain_put(struct bio *chain)
682{
683 struct bio *tmp;
684
685 while (chain) {
686 tmp = chain;
687 chain = chain->bi_next;
688 bio_put(tmp);
689 }
690}
691
692/*
693 * zeros a bio chain, starting at specific offset
694 */
695static void zero_bio_chain(struct bio *chain, int start_ofs)
696{
697 struct bio_vec *bv;
698 unsigned long flags;
699 void *buf;
700 int i;
701 int pos = 0;
702
703 while (chain) {
704 bio_for_each_segment(bv, chain, i) {
705 if (pos + bv->bv_len > start_ofs) {
706 int remainder = max(start_ofs - pos, 0);
707 buf = bvec_kmap_irq(bv, &flags);
708 memset(buf + remainder, 0,
709 bv->bv_len - remainder);
710 bvec_kunmap_irq(buf, &flags);
711 }
712 pos += bv->bv_len;
713 }
714
715 chain = chain->bi_next;
716 }
717}
718
719/*
720 * bio_chain_clone - clone a chain of bios up to a certain length.
721 * might return a bio_pair that will need to be released.
722 */
723static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
724 struct bio_pair **bp,
725 int len, gfp_t gfpmask)
726{
727 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
728 int total = 0;
729
730 if (*bp) {
731 bio_pair_release(*bp);
732 *bp = NULL;
733 }
734
735 while (old_chain && (total < len)) {
736 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
737 if (!tmp)
738 goto err_out;
739
740 if (total + old_chain->bi_size > len) {
741 struct bio_pair *bp;
742
743 /*
744 * this split can only happen with a single paged bio,
745 * split_bio will BUG_ON if this is not the case
746 */
747 dout("bio_chain_clone split! total=%d remaining=%d"
748 "bi_size=%d\n",
749 (int)total, (int)len-total,
750 (int)old_chain->bi_size);
751
752 /* split the bio. We'll release it either in the next
753 call, or it will have to be released outside */
754 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
755 if (!bp)
756 goto err_out;
757
758 __bio_clone(tmp, &bp->bio1);
759
760 *next = &bp->bio2;
761 } else {
762 __bio_clone(tmp, old_chain);
763 *next = old_chain->bi_next;
764 }
765
766 tmp->bi_bdev = NULL;
767 gfpmask &= ~__GFP_WAIT;
768 tmp->bi_next = NULL;
769
770 if (!new_chain) {
771 new_chain = tail = tmp;
772 } else {
773 tail->bi_next = tmp;
774 tail = tmp;
775 }
776 old_chain = old_chain->bi_next;
777
778 total += tmp->bi_size;
779 }
780
781 BUG_ON(total < len);
782
783 if (tail)
784 tail->bi_next = NULL;
785
786 *old = old_chain;
787
788 return new_chain;
789
790err_out:
791 dout("bio_chain_clone with err\n");
792 bio_chain_put(new_chain);
793 return NULL;
794}
795
796/*
797 * helpers for osd request op vectors.
798 */
799static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
800 int num_ops,
801 int opcode,
802 u32 payload_len)
803{
804 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
805 GFP_NOIO);
806 if (!*ops)
807 return -ENOMEM;
808 (*ops)[0].op = opcode;
809 /*
810 * op extent offset and length will be set later on
811 * in calc_raw_layout()
812 */
813 (*ops)[0].payload_len = payload_len;
814 return 0;
815}
816
817static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
818{
819 kfree(ops);
820}
821
822static void rbd_coll_end_req_index(struct request *rq,
823 struct rbd_req_coll *coll,
824 int index,
825 int ret, u64 len)
826{
827 struct request_queue *q;
828 int min, max, i;
829
830 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
831 coll, index, ret, len);
832
833 if (!rq)
834 return;
835
836 if (!coll) {
837 blk_end_request(rq, ret, len);
838 return;
839 }
840
841 q = rq->q;
842
843 spin_lock_irq(q->queue_lock);
844 coll->status[index].done = 1;
845 coll->status[index].rc = ret;
846 coll->status[index].bytes = len;
847 max = min = coll->num_done;
848 while (max < coll->total && coll->status[max].done)
849 max++;
850
851 for (i = min; i<max; i++) {
852 __blk_end_request(rq, coll->status[i].rc,
853 coll->status[i].bytes);
854 coll->num_done++;
855 kref_put(&coll->kref, rbd_coll_release);
856 }
857 spin_unlock_irq(q->queue_lock);
858}
859
860static void rbd_coll_end_req(struct rbd_request *req,
861 int ret, u64 len)
862{
863 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
864}
865
866/*
867 * Send ceph osd request
868 */
869static int rbd_do_request(struct request *rq,
870 struct rbd_device *dev,
871 struct ceph_snap_context *snapc,
872 u64 snapid,
873 const char *obj, u64 ofs, u64 len,
874 struct bio *bio,
875 struct page **pages,
876 int num_pages,
877 int flags,
878 struct ceph_osd_req_op *ops,
879 int num_reply,
880 struct rbd_req_coll *coll,
881 int coll_index,
882 void (*rbd_cb)(struct ceph_osd_request *req,
883 struct ceph_msg *msg),
884 struct ceph_osd_request **linger_req,
885 u64 *ver)
886{
887 struct ceph_osd_request *req;
888 struct ceph_file_layout *layout;
889 int ret;
890 u64 bno;
891 struct timespec mtime = CURRENT_TIME;
892 struct rbd_request *req_data;
893 struct ceph_osd_request_head *reqhead;
894 struct ceph_osd_client *osdc;
895
896 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
897 if (!req_data) {
898 if (coll)
899 rbd_coll_end_req_index(rq, coll, coll_index,
900 -ENOMEM, len);
901 return -ENOMEM;
902 }
903
904 if (coll) {
905 req_data->coll = coll;
906 req_data->coll_index = coll_index;
907 }
908
909 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
910
911 osdc = &dev->rbd_client->client->osdc;
912 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
913 false, GFP_NOIO, pages, bio);
914 if (!req) {
915 ret = -ENOMEM;
916 goto done_pages;
917 }
918
919 req->r_callback = rbd_cb;
920
921 req_data->rq = rq;
922 req_data->bio = bio;
923 req_data->pages = pages;
924 req_data->len = len;
925
926 req->r_priv = req_data;
927
928 reqhead = req->r_request->front.iov_base;
929 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
930
931 strncpy(req->r_oid, obj, sizeof(req->r_oid));
932 req->r_oid_len = strlen(req->r_oid);
933
934 layout = &req->r_file_layout;
935 memset(layout, 0, sizeof(*layout));
936 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
937 layout->fl_stripe_count = cpu_to_le32(1);
938 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
939 layout->fl_pg_preferred = cpu_to_le32(-1);
940 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
941 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
942 req, ops);
943 BUG_ON(ret != 0);
944
945 ceph_osdc_build_request(req, ofs, &len,
946 ops,
947 snapc,
948 &mtime,
949 req->r_oid, req->r_oid_len);
950
951 if (linger_req) {
952 ceph_osdc_set_request_linger(osdc, req);
953 *linger_req = req;
954 }
955
956 ret = ceph_osdc_start_request(osdc, req, false);
957 if (ret < 0)
958 goto done_err;
959
960 if (!rbd_cb) {
961 ret = ceph_osdc_wait_request(osdc, req);
962 if (ver)
963 *ver = le64_to_cpu(req->r_reassert_version.version);
964 dout("reassert_ver=%lld\n",
965 le64_to_cpu(req->r_reassert_version.version));
966 ceph_osdc_put_request(req);
967 }
968 return ret;
969
970done_err:
971 bio_chain_put(req_data->bio);
972 ceph_osdc_put_request(req);
973done_pages:
974 rbd_coll_end_req(req_data, ret, len);
975 kfree(req_data);
976 return ret;
977}
978
979/*
980 * Ceph osd op callback
981 */
982static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
983{
984 struct rbd_request *req_data = req->r_priv;
985 struct ceph_osd_reply_head *replyhead;
986 struct ceph_osd_op *op;
987 __s32 rc;
988 u64 bytes;
989 int read_op;
990
991 /* parse reply */
992 replyhead = msg->front.iov_base;
993 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994 op = (void *)(replyhead + 1);
995 rc = le32_to_cpu(replyhead->result);
996 bytes = le64_to_cpu(op->extent.length);
997 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
998
999 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
1000
1001 if (rc == -ENOENT && read_op) {
1002 zero_bio_chain(req_data->bio, 0);
1003 rc = 0;
1004 } else if (rc == 0 && read_op && bytes < req_data->len) {
1005 zero_bio_chain(req_data->bio, bytes);
1006 bytes = req_data->len;
1007 }
1008
1009 rbd_coll_end_req(req_data, rc, bytes);
1010
1011 if (req_data->bio)
1012 bio_chain_put(req_data->bio);
1013
1014 ceph_osdc_put_request(req);
1015 kfree(req_data);
1016}
1017
1018static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1019{
1020 ceph_osdc_put_request(req);
1021}
1022
1023/*
1024 * Do a synchronous ceph osd operation
1025 */
1026static int rbd_req_sync_op(struct rbd_device *dev,
1027 struct ceph_snap_context *snapc,
1028 u64 snapid,
1029 int opcode,
1030 int flags,
1031 struct ceph_osd_req_op *orig_ops,
1032 int num_reply,
1033 const char *obj,
1034 u64 ofs, u64 len,
1035 char *buf,
1036 struct ceph_osd_request **linger_req,
1037 u64 *ver)
1038{
1039 int ret;
1040 struct page **pages;
1041 int num_pages;
1042 struct ceph_osd_req_op *ops = orig_ops;
1043 u32 payload_len;
1044
1045 num_pages = calc_pages_for(ofs , len);
1046 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1047 if (IS_ERR(pages))
1048 return PTR_ERR(pages);
1049
1050 if (!orig_ops) {
1051 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1052 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1053 if (ret < 0)
1054 goto done;
1055
1056 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1057 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1058 if (ret < 0)
1059 goto done_ops;
1060 }
1061 }
1062
1063 ret = rbd_do_request(NULL, dev, snapc, snapid,
1064 obj, ofs, len, NULL,
1065 pages, num_pages,
1066 flags,
1067 ops,
1068 2,
1069 NULL, 0,
1070 NULL,
1071 linger_req, ver);
1072 if (ret < 0)
1073 goto done_ops;
1074
1075 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1076 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1077
1078done_ops:
1079 if (!orig_ops)
1080 rbd_destroy_ops(ops);
1081done:
1082 ceph_release_page_vector(pages, num_pages);
1083 return ret;
1084}
1085
1086/*
1087 * Do an asynchronous ceph osd operation
1088 */
1089static int rbd_do_op(struct request *rq,
1090 struct rbd_device *rbd_dev ,
1091 struct ceph_snap_context *snapc,
1092 u64 snapid,
1093 int opcode, int flags, int num_reply,
1094 u64 ofs, u64 len,
1095 struct bio *bio,
1096 struct rbd_req_coll *coll,
1097 int coll_index)
1098{
1099 char *seg_name;
1100 u64 seg_ofs;
1101 u64 seg_len;
1102 int ret;
1103 struct ceph_osd_req_op *ops;
1104 u32 payload_len;
1105
1106 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1107 if (!seg_name)
1108 return -ENOMEM;
1109
1110 seg_len = rbd_get_segment(&rbd_dev->header,
1111 rbd_dev->header.block_name,
1112 ofs, len,
1113 seg_name, &seg_ofs);
1114
1115 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1116
1117 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1118 if (ret < 0)
1119 goto done;
1120
1121 /* we've taken care of segment sizes earlier when we
1122 cloned the bios. We should never have a segment
1123 truncated at this point */
1124 BUG_ON(seg_len < len);
1125
1126 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1127 seg_name, seg_ofs, seg_len,
1128 bio,
1129 NULL, 0,
1130 flags,
1131 ops,
1132 num_reply,
1133 coll, coll_index,
1134 rbd_req_cb, 0, NULL);
1135
1136 rbd_destroy_ops(ops);
1137done:
1138 kfree(seg_name);
1139 return ret;
1140}
1141
1142/*
1143 * Request async osd write
1144 */
1145static int rbd_req_write(struct request *rq,
1146 struct rbd_device *rbd_dev,
1147 struct ceph_snap_context *snapc,
1148 u64 ofs, u64 len,
1149 struct bio *bio,
1150 struct rbd_req_coll *coll,
1151 int coll_index)
1152{
1153 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1154 CEPH_OSD_OP_WRITE,
1155 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1156 2,
1157 ofs, len, bio, coll, coll_index);
1158}
1159
1160/*
1161 * Request async osd read
1162 */
1163static int rbd_req_read(struct request *rq,
1164 struct rbd_device *rbd_dev,
1165 u64 snapid,
1166 u64 ofs, u64 len,
1167 struct bio *bio,
1168 struct rbd_req_coll *coll,
1169 int coll_index)
1170{
1171 return rbd_do_op(rq, rbd_dev, NULL,
1172 (snapid ? snapid : CEPH_NOSNAP),
1173 CEPH_OSD_OP_READ,
1174 CEPH_OSD_FLAG_READ,
1175 2,
1176 ofs, len, bio, coll, coll_index);
1177}
1178
1179/*
1180 * Request sync osd read
1181 */
1182static int rbd_req_sync_read(struct rbd_device *dev,
1183 struct ceph_snap_context *snapc,
1184 u64 snapid,
1185 const char *obj,
1186 u64 ofs, u64 len,
1187 char *buf,
1188 u64 *ver)
1189{
1190 return rbd_req_sync_op(dev, NULL,
1191 (snapid ? snapid : CEPH_NOSNAP),
1192 CEPH_OSD_OP_READ,
1193 CEPH_OSD_FLAG_READ,
1194 NULL,
1195 1, obj, ofs, len, buf, NULL, ver);
1196}
1197
1198/*
1199 * Request sync osd watch
1200 */
1201static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1202 u64 ver,
1203 u64 notify_id,
1204 const char *obj)
1205{
1206 struct ceph_osd_req_op *ops;
1207 struct page **pages = NULL;
1208 int ret;
1209
1210 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1211 if (ret < 0)
1212 return ret;
1213
1214 ops[0].watch.ver = cpu_to_le64(ver);
1215 ops[0].watch.cookie = notify_id;
1216 ops[0].watch.flag = 0;
1217
1218 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1219 obj, 0, 0, NULL,
1220 pages, 0,
1221 CEPH_OSD_FLAG_READ,
1222 ops,
1223 1,
1224 NULL, 0,
1225 rbd_simple_req_cb, 0, NULL);
1226
1227 rbd_destroy_ops(ops);
1228 return ret;
1229}
1230
1231static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1232{
1233 struct rbd_device *dev = (struct rbd_device *)data;
1234 u64 hver;
1235 int rc;
1236
1237 if (!dev)
1238 return;
1239
1240 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1241 notify_id, (int)opcode);
1242 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1243 rc = __rbd_update_snaps(dev);
1244 hver = dev->header.obj_version;
1245 mutex_unlock(&ctl_mutex);
1246 if (rc)
1247 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1248 " update snaps: %d\n", dev->major, rc);
1249
1250 rbd_req_sync_notify_ack(dev, hver, notify_id, dev->obj_md_name);
1251}
1252
1253/*
1254 * Request sync osd watch
1255 */
1256static int rbd_req_sync_watch(struct rbd_device *dev,
1257 const char *obj,
1258 u64 ver)
1259{
1260 struct ceph_osd_req_op *ops;
1261 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1262
1263 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1264 if (ret < 0)
1265 return ret;
1266
1267 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1268 (void *)dev, &dev->watch_event);
1269 if (ret < 0)
1270 goto fail;
1271
1272 ops[0].watch.ver = cpu_to_le64(ver);
1273 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1274 ops[0].watch.flag = 1;
1275
1276 ret = rbd_req_sync_op(dev, NULL,
1277 CEPH_NOSNAP,
1278 0,
1279 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1280 ops,
1281 1, obj, 0, 0, NULL,
1282 &dev->watch_request, NULL);
1283
1284 if (ret < 0)
1285 goto fail_event;
1286
1287 rbd_destroy_ops(ops);
1288 return 0;
1289
1290fail_event:
1291 ceph_osdc_cancel_event(dev->watch_event);
1292 dev->watch_event = NULL;
1293fail:
1294 rbd_destroy_ops(ops);
1295 return ret;
1296}
1297
1298/*
1299 * Request sync osd unwatch
1300 */
1301static int rbd_req_sync_unwatch(struct rbd_device *dev,
1302 const char *obj)
1303{
1304 struct ceph_osd_req_op *ops;
1305
1306 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1307 if (ret < 0)
1308 return ret;
1309
1310 ops[0].watch.ver = 0;
1311 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1312 ops[0].watch.flag = 0;
1313
1314 ret = rbd_req_sync_op(dev, NULL,
1315 CEPH_NOSNAP,
1316 0,
1317 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1318 ops,
1319 1, obj, 0, 0, NULL, NULL, NULL);
1320
1321 rbd_destroy_ops(ops);
1322 ceph_osdc_cancel_event(dev->watch_event);
1323 dev->watch_event = NULL;
1324 return ret;
1325}
1326
1327#if 0
1328/*
1329 * Request sync osd read
1330 */
1331static int rbd_req_sync_exec(struct rbd_device *dev,
1332 const char *obj,
1333 const char *cls,
1334 const char *method,
1335 const char *data,
1336 int len,
1337 u64 *ver)
1338{
1339 struct ceph_osd_req_op *ops;
1340 int cls_len = strlen(cls);
1341 int method_len = strlen(method);
1342 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1343 cls_len + method_len + len);
1344 if (ret < 0)
1345 return ret;
1346
1347 ops[0].cls.class_name = cls;
1348 ops[0].cls.class_len = (__u8)cls_len;
1349 ops[0].cls.method_name = method;
1350 ops[0].cls.method_len = (__u8)method_len;
1351 ops[0].cls.argc = 0;
1352 ops[0].cls.indata = data;
1353 ops[0].cls.indata_len = len;
1354
1355 ret = rbd_req_sync_op(dev, NULL,
1356 CEPH_NOSNAP,
1357 0,
1358 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1359 ops,
1360 1, obj, 0, 0, NULL, NULL, ver);
1361
1362 rbd_destroy_ops(ops);
1363
1364 dout("cls_exec returned %d\n", ret);
1365 return ret;
1366}
1367#endif
1368
1369static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1370{
1371 struct rbd_req_coll *coll =
1372 kzalloc(sizeof(struct rbd_req_coll) +
1373 sizeof(struct rbd_req_status) * num_reqs,
1374 GFP_ATOMIC);
1375
1376 if (!coll)
1377 return NULL;
1378 coll->total = num_reqs;
1379 kref_init(&coll->kref);
1380 return coll;
1381}
1382
1383/*
1384 * block device queue callback
1385 */
1386static void rbd_rq_fn(struct request_queue *q)
1387{
1388 struct rbd_device *rbd_dev = q->queuedata;
1389 struct request *rq;
1390 struct bio_pair *bp = NULL;
1391
1392 while ((rq = blk_fetch_request(q))) {
1393 struct bio *bio;
1394 struct bio *rq_bio, *next_bio = NULL;
1395 bool do_write;
1396 int size, op_size = 0;
1397 u64 ofs;
1398 int num_segs, cur_seg = 0;
1399 struct rbd_req_coll *coll;
1400 struct ceph_snap_context *snapc;
1401
1402 /* peek at request from block layer */
1403 if (!rq)
1404 break;
1405
1406 dout("fetched request\n");
1407
1408 /* filter out block requests we don't understand */
1409 if ((rq->cmd_type != REQ_TYPE_FS)) {
1410 __blk_end_request_all(rq, 0);
1411 continue;
1412 }
1413
1414 /* deduce our operation (read, write) */
1415 do_write = (rq_data_dir(rq) == WRITE);
1416
1417 size = blk_rq_bytes(rq);
1418 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1419 rq_bio = rq->bio;
1420 if (do_write && rbd_dev->read_only) {
1421 __blk_end_request_all(rq, -EROFS);
1422 continue;
1423 }
1424
1425 spin_unlock_irq(q->queue_lock);
1426
1427 down_read(&rbd_dev->header_rwsem);
1428
1429 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1430 up_read(&rbd_dev->header_rwsem);
1431 dout("request for non-existent snapshot");
1432 spin_lock_irq(q->queue_lock);
1433 __blk_end_request_all(rq, -ENXIO);
1434 continue;
1435 }
1436
1437 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1438
1439 up_read(&rbd_dev->header_rwsem);
1440
1441 dout("%s 0x%x bytes at 0x%llx\n",
1442 do_write ? "write" : "read",
1443 size, blk_rq_pos(rq) * SECTOR_SIZE);
1444
1445 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1446 coll = rbd_alloc_coll(num_segs);
1447 if (!coll) {
1448 spin_lock_irq(q->queue_lock);
1449 __blk_end_request_all(rq, -ENOMEM);
1450 ceph_put_snap_context(snapc);
1451 continue;
1452 }
1453
1454 do {
1455 /* a bio clone to be passed down to OSD req */
1456 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1457 op_size = rbd_get_segment(&rbd_dev->header,
1458 rbd_dev->header.block_name,
1459 ofs, size,
1460 NULL, NULL);
1461 kref_get(&coll->kref);
1462 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1463 op_size, GFP_ATOMIC);
1464 if (!bio) {
1465 rbd_coll_end_req_index(rq, coll, cur_seg,
1466 -ENOMEM, op_size);
1467 goto next_seg;
1468 }
1469
1470
1471 /* init OSD command: write or read */
1472 if (do_write)
1473 rbd_req_write(rq, rbd_dev,
1474 snapc,
1475 ofs,
1476 op_size, bio,
1477 coll, cur_seg);
1478 else
1479 rbd_req_read(rq, rbd_dev,
1480 rbd_dev->snap_id,
1481 ofs,
1482 op_size, bio,
1483 coll, cur_seg);
1484
1485next_seg:
1486 size -= op_size;
1487 ofs += op_size;
1488
1489 cur_seg++;
1490 rq_bio = next_bio;
1491 } while (size > 0);
1492 kref_put(&coll->kref, rbd_coll_release);
1493
1494 if (bp)
1495 bio_pair_release(bp);
1496 spin_lock_irq(q->queue_lock);
1497
1498 ceph_put_snap_context(snapc);
1499 }
1500}
1501
1502/*
1503 * a queue callback. Makes sure that we don't create a bio that spans across
1504 * multiple osd objects. One exception would be with a single page bios,
1505 * which we handle later at bio_chain_clone
1506 */
1507static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1508 struct bio_vec *bvec)
1509{
1510 struct rbd_device *rbd_dev = q->queuedata;
1511 unsigned int chunk_sectors;
1512 sector_t sector;
1513 unsigned int bio_sectors;
1514 int max;
1515
1516 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1517 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1518 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1519
1520 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1521 + bio_sectors)) << SECTOR_SHIFT;
1522 if (max < 0)
1523 max = 0; /* bio_add cannot handle a negative return */
1524 if (max <= bvec->bv_len && bio_sectors == 0)
1525 return bvec->bv_len;
1526 return max;
1527}
1528
1529static void rbd_free_disk(struct rbd_device *rbd_dev)
1530{
1531 struct gendisk *disk = rbd_dev->disk;
1532
1533 if (!disk)
1534 return;
1535
1536 rbd_header_free(&rbd_dev->header);
1537
1538 if (disk->flags & GENHD_FL_UP)
1539 del_gendisk(disk);
1540 if (disk->queue)
1541 blk_cleanup_queue(disk->queue);
1542 put_disk(disk);
1543}
1544
1545/*
1546 * reload the ondisk the header
1547 */
1548static int rbd_read_header(struct rbd_device *rbd_dev,
1549 struct rbd_image_header *header)
1550{
1551 ssize_t rc;
1552 struct rbd_image_header_ondisk *dh;
1553 int snap_count = 0;
1554 u64 ver;
1555 size_t len;
1556
1557 /*
1558 * First reads the fixed-size header to determine the number
1559 * of snapshots, then re-reads it, along with all snapshot
1560 * records as well as their stored names.
1561 */
1562 len = sizeof (*dh);
1563 while (1) {
1564 dh = kmalloc(len, GFP_KERNEL);
1565 if (!dh)
1566 return -ENOMEM;
1567
1568 rc = rbd_req_sync_read(rbd_dev,
1569 NULL, CEPH_NOSNAP,
1570 rbd_dev->obj_md_name,
1571 0, len,
1572 (char *)dh, &ver);
1573 if (rc < 0)
1574 goto out_dh;
1575
1576 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1577 if (rc < 0) {
1578 if (rc == -ENXIO)
1579 pr_warning("unrecognized header format"
1580 " for image %s", rbd_dev->obj);
1581 goto out_dh;
1582 }
1583
1584 if (snap_count == header->total_snaps)
1585 break;
1586
1587 snap_count = header->total_snaps;
1588 len = sizeof (*dh) +
1589 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1590 header->snap_names_len;
1591
1592 rbd_header_free(header);
1593 kfree(dh);
1594 }
1595 header->obj_version = ver;
1596
1597out_dh:
1598 kfree(dh);
1599 return rc;
1600}
1601
1602static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1603{
1604 struct rbd_snap *snap;
1605
1606 while (!list_empty(&rbd_dev->snaps)) {
1607 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1608 __rbd_remove_snap_dev(rbd_dev, snap);
1609 }
1610}
1611
1612/*
1613 * only read the first part of the ondisk header, without the snaps info
1614 */
1615static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1616{
1617 int ret;
1618 struct rbd_image_header h;
1619 u64 snap_seq;
1620 int follow_seq = 0;
1621
1622 ret = rbd_read_header(rbd_dev, &h);
1623 if (ret < 0)
1624 return ret;
1625
1626 down_write(&rbd_dev->header_rwsem);
1627
1628 /* resized? */
1629 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1630 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1631
1632 dout("setting size to %llu sectors", (unsigned long long) size);
1633 set_capacity(rbd_dev->disk, size);
1634 }
1635
1636 snap_seq = rbd_dev->header.snapc->seq;
1637 if (rbd_dev->header.total_snaps &&
1638 rbd_dev->header.snapc->snaps[0] == snap_seq)
1639 /* pointing at the head, will need to follow that
1640 if head moves */
1641 follow_seq = 1;
1642
1643 ceph_put_snap_context(rbd_dev->header.snapc);
1644 kfree(rbd_dev->header.snap_names);
1645 kfree(rbd_dev->header.snap_sizes);
1646
1647 rbd_dev->header.obj_version = h.obj_version;
1648 rbd_dev->header.image_size = h.image_size;
1649 rbd_dev->header.total_snaps = h.total_snaps;
1650 rbd_dev->header.snapc = h.snapc;
1651 rbd_dev->header.snap_names = h.snap_names;
1652 rbd_dev->header.snap_names_len = h.snap_names_len;
1653 rbd_dev->header.snap_sizes = h.snap_sizes;
1654 if (follow_seq)
1655 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1656 else
1657 rbd_dev->header.snapc->seq = snap_seq;
1658
1659 ret = __rbd_init_snaps_header(rbd_dev);
1660
1661 up_write(&rbd_dev->header_rwsem);
1662
1663 return ret;
1664}
1665
1666static int rbd_init_disk(struct rbd_device *rbd_dev)
1667{
1668 struct gendisk *disk;
1669 struct request_queue *q;
1670 int rc;
1671 u64 segment_size;
1672 u64 total_size = 0;
1673
1674 /* contact OSD, request size info about the object being mapped */
1675 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1676 if (rc)
1677 return rc;
1678
1679 /* no need to lock here, as rbd_dev is not registered yet */
1680 rc = __rbd_init_snaps_header(rbd_dev);
1681 if (rc)
1682 return rc;
1683
1684 rc = rbd_header_set_snap(rbd_dev, &total_size);
1685 if (rc)
1686 return rc;
1687
1688 /* create gendisk info */
1689 rc = -ENOMEM;
1690 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1691 if (!disk)
1692 goto out;
1693
1694 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1695 rbd_dev->id);
1696 disk->major = rbd_dev->major;
1697 disk->first_minor = 0;
1698 disk->fops = &rbd_bd_ops;
1699 disk->private_data = rbd_dev;
1700
1701 /* init rq */
1702 rc = -ENOMEM;
1703 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1704 if (!q)
1705 goto out_disk;
1706
1707 /* We use the default size, but let's be explicit about it. */
1708 blk_queue_physical_block_size(q, SECTOR_SIZE);
1709
1710 /* set io sizes to object size */
1711 segment_size = rbd_obj_bytes(&rbd_dev->header);
1712 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1713 blk_queue_max_segment_size(q, segment_size);
1714 blk_queue_io_min(q, segment_size);
1715 blk_queue_io_opt(q, segment_size);
1716
1717 blk_queue_merge_bvec(q, rbd_merge_bvec);
1718 disk->queue = q;
1719
1720 q->queuedata = rbd_dev;
1721
1722 rbd_dev->disk = disk;
1723 rbd_dev->q = q;
1724
1725 /* finally, announce the disk to the world */
1726 set_capacity(disk, total_size / SECTOR_SIZE);
1727 add_disk(disk);
1728
1729 pr_info("%s: added with size 0x%llx\n",
1730 disk->disk_name, (unsigned long long)total_size);
1731 return 0;
1732
1733out_disk:
1734 put_disk(disk);
1735out:
1736 return rc;
1737}
1738
1739/*
1740 sysfs
1741*/
1742
1743static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1744{
1745 return container_of(dev, struct rbd_device, dev);
1746}
1747
1748static ssize_t rbd_size_show(struct device *dev,
1749 struct device_attribute *attr, char *buf)
1750{
1751 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1752 sector_t size;
1753
1754 down_read(&rbd_dev->header_rwsem);
1755 size = get_capacity(rbd_dev->disk);
1756 up_read(&rbd_dev->header_rwsem);
1757
1758 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1759}
1760
1761static ssize_t rbd_major_show(struct device *dev,
1762 struct device_attribute *attr, char *buf)
1763{
1764 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1765
1766 return sprintf(buf, "%d\n", rbd_dev->major);
1767}
1768
1769static ssize_t rbd_client_id_show(struct device *dev,
1770 struct device_attribute *attr, char *buf)
1771{
1772 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1773
1774 return sprintf(buf, "client%lld\n",
1775 ceph_client_id(rbd_dev->rbd_client->client));
1776}
1777
1778static ssize_t rbd_pool_show(struct device *dev,
1779 struct device_attribute *attr, char *buf)
1780{
1781 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1782
1783 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1784}
1785
1786static ssize_t rbd_name_show(struct device *dev,
1787 struct device_attribute *attr, char *buf)
1788{
1789 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1790
1791 return sprintf(buf, "%s\n", rbd_dev->obj);
1792}
1793
1794static ssize_t rbd_snap_show(struct device *dev,
1795 struct device_attribute *attr,
1796 char *buf)
1797{
1798 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1799
1800 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1801}
1802
1803static ssize_t rbd_image_refresh(struct device *dev,
1804 struct device_attribute *attr,
1805 const char *buf,
1806 size_t size)
1807{
1808 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1809 int rc;
1810 int ret = size;
1811
1812 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1813
1814 rc = __rbd_update_snaps(rbd_dev);
1815 if (rc < 0)
1816 ret = rc;
1817
1818 mutex_unlock(&ctl_mutex);
1819 return ret;
1820}
1821
1822static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1823static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1824static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1825static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1826static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1827static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1828static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1829
1830static struct attribute *rbd_attrs[] = {
1831 &dev_attr_size.attr,
1832 &dev_attr_major.attr,
1833 &dev_attr_client_id.attr,
1834 &dev_attr_pool.attr,
1835 &dev_attr_name.attr,
1836 &dev_attr_current_snap.attr,
1837 &dev_attr_refresh.attr,
1838 NULL
1839};
1840
1841static struct attribute_group rbd_attr_group = {
1842 .attrs = rbd_attrs,
1843};
1844
1845static const struct attribute_group *rbd_attr_groups[] = {
1846 &rbd_attr_group,
1847 NULL
1848};
1849
1850static void rbd_sysfs_dev_release(struct device *dev)
1851{
1852}
1853
1854static struct device_type rbd_device_type = {
1855 .name = "rbd",
1856 .groups = rbd_attr_groups,
1857 .release = rbd_sysfs_dev_release,
1858};
1859
1860
1861/*
1862 sysfs - snapshots
1863*/
1864
1865static ssize_t rbd_snap_size_show(struct device *dev,
1866 struct device_attribute *attr,
1867 char *buf)
1868{
1869 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1870
1871 return sprintf(buf, "%zd\n", snap->size);
1872}
1873
1874static ssize_t rbd_snap_id_show(struct device *dev,
1875 struct device_attribute *attr,
1876 char *buf)
1877{
1878 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1879
1880 return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
1881}
1882
1883static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1884static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1885
1886static struct attribute *rbd_snap_attrs[] = {
1887 &dev_attr_snap_size.attr,
1888 &dev_attr_snap_id.attr,
1889 NULL,
1890};
1891
1892static struct attribute_group rbd_snap_attr_group = {
1893 .attrs = rbd_snap_attrs,
1894};
1895
1896static void rbd_snap_dev_release(struct device *dev)
1897{
1898 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1899 kfree(snap->name);
1900 kfree(snap);
1901}
1902
1903static const struct attribute_group *rbd_snap_attr_groups[] = {
1904 &rbd_snap_attr_group,
1905 NULL
1906};
1907
1908static struct device_type rbd_snap_device_type = {
1909 .groups = rbd_snap_attr_groups,
1910 .release = rbd_snap_dev_release,
1911};
1912
1913static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1914 struct rbd_snap *snap)
1915{
1916 list_del(&snap->node);
1917 device_unregister(&snap->dev);
1918}
1919
1920static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1921 struct rbd_snap *snap,
1922 struct device *parent)
1923{
1924 struct device *dev = &snap->dev;
1925 int ret;
1926
1927 dev->type = &rbd_snap_device_type;
1928 dev->parent = parent;
1929 dev->release = rbd_snap_dev_release;
1930 dev_set_name(dev, "snap_%s", snap->name);
1931 ret = device_register(dev);
1932
1933 return ret;
1934}
1935
1936static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1937 int i, const char *name,
1938 struct rbd_snap **snapp)
1939{
1940 int ret;
1941 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1942 if (!snap)
1943 return -ENOMEM;
1944 snap->name = kstrdup(name, GFP_KERNEL);
1945 snap->size = rbd_dev->header.snap_sizes[i];
1946 snap->id = rbd_dev->header.snapc->snaps[i];
1947 if (device_is_registered(&rbd_dev->dev)) {
1948 ret = rbd_register_snap_dev(rbd_dev, snap,
1949 &rbd_dev->dev);
1950 if (ret < 0)
1951 goto err;
1952 }
1953 *snapp = snap;
1954 return 0;
1955err:
1956 kfree(snap->name);
1957 kfree(snap);
1958 return ret;
1959}
1960
1961/*
1962 * search for the previous snap in a null delimited string list
1963 */
1964const char *rbd_prev_snap_name(const char *name, const char *start)
1965{
1966 if (name < start + 2)
1967 return NULL;
1968
1969 name -= 2;
1970 while (*name) {
1971 if (name == start)
1972 return start;
1973 name--;
1974 }
1975 return name + 1;
1976}
1977
1978/*
1979 * compare the old list of snapshots that we have to what's in the header
1980 * and update it accordingly. Note that the header holds the snapshots
1981 * in a reverse order (from newest to oldest) and we need to go from
1982 * older to new so that we don't get a duplicate snap name when
1983 * doing the process (e.g., removed snapshot and recreated a new
1984 * one with the same name.
1985 */
1986static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1987{
1988 const char *name, *first_name;
1989 int i = rbd_dev->header.total_snaps;
1990 struct rbd_snap *snap, *old_snap = NULL;
1991 int ret;
1992 struct list_head *p, *n;
1993
1994 first_name = rbd_dev->header.snap_names;
1995 name = first_name + rbd_dev->header.snap_names_len;
1996
1997 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1998 u64 cur_id;
1999
2000 old_snap = list_entry(p, struct rbd_snap, node);
2001
2002 if (i)
2003 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2004
2005 if (!i || old_snap->id < cur_id) {
2006 /*
2007 * old_snap->id was skipped, thus was
2008 * removed. If this rbd_dev is mapped to
2009 * the removed snapshot, record that it no
2010 * longer exists, to prevent further I/O.
2011 */
2012 if (rbd_dev->snap_id == old_snap->id)
2013 rbd_dev->snap_exists = false;
2014 __rbd_remove_snap_dev(rbd_dev, old_snap);
2015 continue;
2016 }
2017 if (old_snap->id == cur_id) {
2018 /* we have this snapshot already */
2019 i--;
2020 name = rbd_prev_snap_name(name, first_name);
2021 continue;
2022 }
2023 for (; i > 0;
2024 i--, name = rbd_prev_snap_name(name, first_name)) {
2025 if (!name) {
2026 WARN_ON(1);
2027 return -EINVAL;
2028 }
2029 cur_id = rbd_dev->header.snapc->snaps[i];
2030 /* snapshot removal? handle it above */
2031 if (cur_id >= old_snap->id)
2032 break;
2033 /* a new snapshot */
2034 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2035 if (ret < 0)
2036 return ret;
2037
2038 /* note that we add it backward so using n and not p */
2039 list_add(&snap->node, n);
2040 p = &snap->node;
2041 }
2042 }
2043 /* we're done going over the old snap list, just add what's left */
2044 for (; i > 0; i--) {
2045 name = rbd_prev_snap_name(name, first_name);
2046 if (!name) {
2047 WARN_ON(1);
2048 return -EINVAL;
2049 }
2050 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2051 if (ret < 0)
2052 return ret;
2053 list_add(&snap->node, &rbd_dev->snaps);
2054 }
2055
2056 return 0;
2057}
2058
2059static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2060{
2061 int ret;
2062 struct device *dev;
2063 struct rbd_snap *snap;
2064
2065 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2066 dev = &rbd_dev->dev;
2067
2068 dev->bus = &rbd_bus_type;
2069 dev->type = &rbd_device_type;
2070 dev->parent = &rbd_root_dev;
2071 dev->release = rbd_dev_release;
2072 dev_set_name(dev, "%d", rbd_dev->id);
2073 ret = device_register(dev);
2074 if (ret < 0)
2075 goto out;
2076
2077 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2078 ret = rbd_register_snap_dev(rbd_dev, snap,
2079 &rbd_dev->dev);
2080 if (ret < 0)
2081 break;
2082 }
2083out:
2084 mutex_unlock(&ctl_mutex);
2085 return ret;
2086}
2087
2088static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2089{
2090 device_unregister(&rbd_dev->dev);
2091}
2092
2093static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2094{
2095 int ret, rc;
2096
2097 do {
2098 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2099 rbd_dev->header.obj_version);
2100 if (ret == -ERANGE) {
2101 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2102 rc = __rbd_update_snaps(rbd_dev);
2103 mutex_unlock(&ctl_mutex);
2104 if (rc < 0)
2105 return rc;
2106 }
2107 } while (ret == -ERANGE);
2108
2109 return ret;
2110}
2111
2112static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2113
2114/*
2115 * Get a unique rbd identifier for the given new rbd_dev, and add
2116 * the rbd_dev to the global list. The minimum rbd id is 1.
2117 */
2118static void rbd_id_get(struct rbd_device *rbd_dev)
2119{
2120 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2121
2122 spin_lock(&rbd_dev_list_lock);
2123 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2124 spin_unlock(&rbd_dev_list_lock);
2125}
2126
2127/*
2128 * Remove an rbd_dev from the global list, and record that its
2129 * identifier is no longer in use.
2130 */
2131static void rbd_id_put(struct rbd_device *rbd_dev)
2132{
2133 struct list_head *tmp;
2134 int rbd_id = rbd_dev->id;
2135 int max_id;
2136
2137 BUG_ON(rbd_id < 1);
2138
2139 spin_lock(&rbd_dev_list_lock);
2140 list_del_init(&rbd_dev->node);
2141
2142 /*
2143 * If the id being "put" is not the current maximum, there
2144 * is nothing special we need to do.
2145 */
2146 if (rbd_id != atomic64_read(&rbd_id_max)) {
2147 spin_unlock(&rbd_dev_list_lock);
2148 return;
2149 }
2150
2151 /*
2152 * We need to update the current maximum id. Search the
2153 * list to find out what it is. We're more likely to find
2154 * the maximum at the end, so search the list backward.
2155 */
2156 max_id = 0;
2157 list_for_each_prev(tmp, &rbd_dev_list) {
2158 struct rbd_device *rbd_dev;
2159
2160 rbd_dev = list_entry(tmp, struct rbd_device, node);
2161 if (rbd_dev->id > max_id)
2162 max_id = rbd_dev->id;
2163 }
2164 spin_unlock(&rbd_dev_list_lock);
2165
2166 /*
2167 * The max id could have been updated by rbd_id_get(), in
2168 * which case it now accurately reflects the new maximum.
2169 * Be careful not to overwrite the maximum value in that
2170 * case.
2171 */
2172 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2173}
2174
2175/*
2176 * Skips over white space at *buf, and updates *buf to point to the
2177 * first found non-space character (if any). Returns the length of
2178 * the token (string of non-white space characters) found. Note
2179 * that *buf must be terminated with '\0'.
2180 */
2181static inline size_t next_token(const char **buf)
2182{
2183 /*
2184 * These are the characters that produce nonzero for
2185 * isspace() in the "C" and "POSIX" locales.
2186 */
2187 const char *spaces = " \f\n\r\t\v";
2188
2189 *buf += strspn(*buf, spaces); /* Find start of token */
2190
2191 return strcspn(*buf, spaces); /* Return token length */
2192}
2193
2194/*
2195 * Finds the next token in *buf, and if the provided token buffer is
2196 * big enough, copies the found token into it. The result, if
2197 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2198 * must be terminated with '\0' on entry.
2199 *
2200 * Returns the length of the token found (not including the '\0').
2201 * Return value will be 0 if no token is found, and it will be >=
2202 * token_size if the token would not fit.
2203 *
2204 * The *buf pointer will be updated to point beyond the end of the
2205 * found token. Note that this occurs even if the token buffer is
2206 * too small to hold it.
2207 */
2208static inline size_t copy_token(const char **buf,
2209 char *token,
2210 size_t token_size)
2211{
2212 size_t len;
2213
2214 len = next_token(buf);
2215 if (len < token_size) {
2216 memcpy(token, *buf, len);
2217 *(token + len) = '\0';
2218 }
2219 *buf += len;
2220
2221 return len;
2222}
2223
2224/*
2225 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2226 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2227 * on the list of monitor addresses and other options provided via
2228 * /sys/bus/rbd/add.
2229 */
2230static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2231 const char *buf,
2232 const char **mon_addrs,
2233 size_t *mon_addrs_size,
2234 char *options,
2235 size_t options_size)
2236{
2237 size_t len;
2238
2239 /* The first four tokens are required */
2240
2241 len = next_token(&buf);
2242 if (!len)
2243 return -EINVAL;
2244 *mon_addrs_size = len + 1;
2245 *mon_addrs = buf;
2246
2247 buf += len;
2248
2249 len = copy_token(&buf, options, options_size);
2250 if (!len || len >= options_size)
2251 return -EINVAL;
2252
2253 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2254 if (!len || len >= sizeof (rbd_dev->pool_name))
2255 return -EINVAL;
2256
2257 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2258 if (!len || len >= sizeof (rbd_dev->obj))
2259 return -EINVAL;
2260
2261 /* We have the object length in hand, save it. */
2262
2263 rbd_dev->obj_len = len;
2264
2265 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2266 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2267 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2268
2269 /*
2270 * The snapshot name is optional, but it's an error if it's
2271 * too long. If no snapshot is supplied, fill in the default.
2272 */
2273 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2274 if (!len)
2275 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2276 sizeof (RBD_SNAP_HEAD_NAME));
2277 else if (len >= sizeof (rbd_dev->snap_name))
2278 return -EINVAL;
2279
2280 return 0;
2281}
2282
2283static ssize_t rbd_add(struct bus_type *bus,
2284 const char *buf,
2285 size_t count)
2286{
2287 struct rbd_device *rbd_dev;
2288 const char *mon_addrs = NULL;
2289 size_t mon_addrs_size = 0;
2290 char *options = NULL;
2291 struct ceph_osd_client *osdc;
2292 int rc = -ENOMEM;
2293
2294 if (!try_module_get(THIS_MODULE))
2295 return -ENODEV;
2296
2297 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2298 if (!rbd_dev)
2299 goto err_nomem;
2300 options = kmalloc(count, GFP_KERNEL);
2301 if (!options)
2302 goto err_nomem;
2303
2304 /* static rbd_device initialization */
2305 spin_lock_init(&rbd_dev->lock);
2306 INIT_LIST_HEAD(&rbd_dev->node);
2307 INIT_LIST_HEAD(&rbd_dev->snaps);
2308 init_rwsem(&rbd_dev->header_rwsem);
2309
2310 init_rwsem(&rbd_dev->header_rwsem);
2311
2312 /* generate unique id: find highest unique id, add one */
2313 rbd_id_get(rbd_dev);
2314
2315 /* Fill in the device name, now that we have its id. */
2316 BUILD_BUG_ON(DEV_NAME_LEN
2317 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2318 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2319
2320 /* parse add command */
2321 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2322 options, count);
2323 if (rc)
2324 goto err_put_id;
2325
2326 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2327 options);
2328 if (IS_ERR(rbd_dev->rbd_client)) {
2329 rc = PTR_ERR(rbd_dev->rbd_client);
2330 goto err_put_id;
2331 }
2332
2333 /* pick the pool */
2334 osdc = &rbd_dev->rbd_client->client->osdc;
2335 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2336 if (rc < 0)
2337 goto err_out_client;
2338 rbd_dev->poolid = rc;
2339
2340 /* register our block device */
2341 rc = register_blkdev(0, rbd_dev->name);
2342 if (rc < 0)
2343 goto err_out_client;
2344 rbd_dev->major = rc;
2345
2346 rc = rbd_bus_add_dev(rbd_dev);
2347 if (rc)
2348 goto err_out_blkdev;
2349
2350 /*
2351 * At this point cleanup in the event of an error is the job
2352 * of the sysfs code (initiated by rbd_bus_del_dev()).
2353 *
2354 * Set up and announce blkdev mapping.
2355 */
2356 rc = rbd_init_disk(rbd_dev);
2357 if (rc)
2358 goto err_out_bus;
2359
2360 rc = rbd_init_watch_dev(rbd_dev);
2361 if (rc)
2362 goto err_out_bus;
2363
2364 return count;
2365
2366err_out_bus:
2367 /* this will also clean up rest of rbd_dev stuff */
2368
2369 rbd_bus_del_dev(rbd_dev);
2370 kfree(options);
2371 return rc;
2372
2373err_out_blkdev:
2374 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2375err_out_client:
2376 rbd_put_client(rbd_dev);
2377err_put_id:
2378 rbd_id_put(rbd_dev);
2379err_nomem:
2380 kfree(options);
2381 kfree(rbd_dev);
2382
2383 dout("Error adding device %s\n", buf);
2384 module_put(THIS_MODULE);
2385
2386 return (ssize_t) rc;
2387}
2388
2389static struct rbd_device *__rbd_get_dev(unsigned long id)
2390{
2391 struct list_head *tmp;
2392 struct rbd_device *rbd_dev;
2393
2394 spin_lock(&rbd_dev_list_lock);
2395 list_for_each(tmp, &rbd_dev_list) {
2396 rbd_dev = list_entry(tmp, struct rbd_device, node);
2397 if (rbd_dev->id == id) {
2398 spin_unlock(&rbd_dev_list_lock);
2399 return rbd_dev;
2400 }
2401 }
2402 spin_unlock(&rbd_dev_list_lock);
2403 return NULL;
2404}
2405
2406static void rbd_dev_release(struct device *dev)
2407{
2408 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2409
2410 if (rbd_dev->watch_request) {
2411 struct ceph_client *client = rbd_dev->rbd_client->client;
2412
2413 ceph_osdc_unregister_linger_request(&client->osdc,
2414 rbd_dev->watch_request);
2415 }
2416 if (rbd_dev->watch_event)
2417 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2418
2419 rbd_put_client(rbd_dev);
2420
2421 /* clean up and free blkdev */
2422 rbd_free_disk(rbd_dev);
2423 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2424
2425 /* done with the id, and with the rbd_dev */
2426 rbd_id_put(rbd_dev);
2427 kfree(rbd_dev);
2428
2429 /* release module ref */
2430 module_put(THIS_MODULE);
2431}
2432
2433static ssize_t rbd_remove(struct bus_type *bus,
2434 const char *buf,
2435 size_t count)
2436{
2437 struct rbd_device *rbd_dev = NULL;
2438 int target_id, rc;
2439 unsigned long ul;
2440 int ret = count;
2441
2442 rc = strict_strtoul(buf, 10, &ul);
2443 if (rc)
2444 return rc;
2445
2446 /* convert to int; abort if we lost anything in the conversion */
2447 target_id = (int) ul;
2448 if (target_id != ul)
2449 return -EINVAL;
2450
2451 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2452
2453 rbd_dev = __rbd_get_dev(target_id);
2454 if (!rbd_dev) {
2455 ret = -ENOENT;
2456 goto done;
2457 }
2458
2459 if (rbd_dev->open_count) {
2460 ret = -EBUSY;
2461 goto done;
2462 }
2463
2464 __rbd_remove_all_snaps(rbd_dev);
2465 rbd_bus_del_dev(rbd_dev);
2466
2467done:
2468 mutex_unlock(&ctl_mutex);
2469 return ret;
2470}
2471
2472/*
2473 * create control files in sysfs
2474 * /sys/bus/rbd/...
2475 */
2476static int rbd_sysfs_init(void)
2477{
2478 int ret;
2479
2480 ret = device_register(&rbd_root_dev);
2481 if (ret < 0)
2482 return ret;
2483
2484 ret = bus_register(&rbd_bus_type);
2485 if (ret < 0)
2486 device_unregister(&rbd_root_dev);
2487
2488 return ret;
2489}
2490
2491static void rbd_sysfs_cleanup(void)
2492{
2493 bus_unregister(&rbd_bus_type);
2494 device_unregister(&rbd_root_dev);
2495}
2496
2497int __init rbd_init(void)
2498{
2499 int rc;
2500
2501 rc = rbd_sysfs_init();
2502 if (rc)
2503 return rc;
2504 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2505 return 0;
2506}
2507
2508void __exit rbd_exit(void)
2509{
2510 rbd_sysfs_cleanup();
2511}
2512
2513module_init(rbd_init);
2514module_exit(rbd_exit);
2515
2516MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2517MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2518MODULE_DESCRIPTION("rados block device");
2519
2520/* following authorship retained from original osdblk.c */
2521MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2522
2523MODULE_LICENSE("GPL");