blob: a4725ac1d4096776495d3a816d62813736e0ad09 [file] [log] [blame]
rjw1f884582022-01-06 17:20:42 +08001
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25 For usage instructions, please refer to:
26
27 Documentation/ABI/testing/sysfs-bus-rbd
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/cls_lock_client.h>
35#include <linux/ceph/decode.h>
36#include <linux/parser.h>
37#include <linux/bsearch.h>
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
42#include <linux/blk-mq.h>
43#include <linux/fs.h>
44#include <linux/blkdev.h>
45#include <linux/slab.h>
46#include <linux/idr.h>
47#include <linux/workqueue.h>
48
49#include "rbd_types.h"
50
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
53/*
54 * Increment the given counter and return its updated value.
55 * If the counter is already 0 it will not be incremented.
56 * If the counter is already at its maximum value returns
57 * -EINVAL without updating it.
58 */
59static int atomic_inc_return_safe(atomic_t *v)
60{
61 unsigned int counter;
62
63 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
64 if (counter <= (unsigned int)INT_MAX)
65 return (int)counter;
66
67 atomic_dec(v);
68
69 return -EINVAL;
70}
71
72/* Decrement the counter. Return the resulting value, or -EINVAL */
73static int atomic_dec_return_safe(atomic_t *v)
74{
75 int counter;
76
77 counter = atomic_dec_return(v);
78 if (counter >= 0)
79 return counter;
80
81 atomic_inc(v);
82
83 return -EINVAL;
84}
85
86#define RBD_DRV_NAME "rbd"
87
88#define RBD_MINORS_PER_MAJOR 256
89#define RBD_SINGLE_MAJOR_PART_SHIFT 4
90
91#define RBD_MAX_PARENT_CHAIN_LEN 16
92
93#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
94#define RBD_MAX_SNAP_NAME_LEN \
95 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
96
97#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
98
99#define RBD_SNAP_HEAD_NAME "-"
100
101#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
102
103/* This allows a single page to hold an image name sent by OSD */
104#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
105#define RBD_IMAGE_ID_LEN_MAX 64
106
107#define RBD_OBJ_PREFIX_LEN_MAX 64
108
109#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
110#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
111
112/* Feature bits */
113
114#define RBD_FEATURE_LAYERING (1ULL<<0)
115#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
116#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
117#define RBD_FEATURE_DATA_POOL (1ULL<<7)
118#define RBD_FEATURE_OPERATIONS (1ULL<<8)
119
120#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
121 RBD_FEATURE_STRIPINGV2 | \
122 RBD_FEATURE_EXCLUSIVE_LOCK | \
123 RBD_FEATURE_DATA_POOL | \
124 RBD_FEATURE_OPERATIONS)
125
126/* Features supported by this (client software) implementation. */
127
128#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
129
130/*
131 * An RBD device name will be "rbd#", where the "rbd" comes from
132 * RBD_DRV_NAME above, and # is a unique integer identifier.
133 */
134#define DEV_NAME_LEN 32
135
136/*
137 * block device image metadata (in-memory version)
138 */
139struct rbd_image_header {
140 /* These six fields never change for a given rbd image */
141 char *object_prefix;
142 __u8 obj_order;
143 u64 stripe_unit;
144 u64 stripe_count;
145 s64 data_pool_id;
146 u64 features; /* Might be changeable someday? */
147
148 /* The remaining fields need to be updated occasionally */
149 u64 image_size;
150 struct ceph_snap_context *snapc;
151 char *snap_names; /* format 1 only */
152 u64 *snap_sizes; /* format 1 only */
153};
154
155/*
156 * An rbd image specification.
157 *
158 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
159 * identify an image. Each rbd_dev structure includes a pointer to
160 * an rbd_spec structure that encapsulates this identity.
161 *
162 * Each of the id's in an rbd_spec has an associated name. For a
163 * user-mapped image, the names are supplied and the id's associated
164 * with them are looked up. For a layered image, a parent image is
165 * defined by the tuple, and the names are looked up.
166 *
167 * An rbd_dev structure contains a parent_spec pointer which is
168 * non-null if the image it represents is a child in a layered
169 * image. This pointer will refer to the rbd_spec structure used
170 * by the parent rbd_dev for its own identity (i.e., the structure
171 * is shared between the parent and child).
172 *
173 * Since these structures are populated once, during the discovery
174 * phase of image construction, they are effectively immutable so
175 * we make no effort to synchronize access to them.
176 *
177 * Note that code herein does not assume the image name is known (it
178 * could be a null pointer).
179 */
180struct rbd_spec {
181 u64 pool_id;
182 const char *pool_name;
183
184 const char *image_id;
185 const char *image_name;
186
187 u64 snap_id;
188 const char *snap_name;
189
190 struct kref kref;
191};
192
193/*
194 * an instance of the client. multiple devices may share an rbd client.
195 */
196struct rbd_client {
197 struct ceph_client *client;
198 struct kref kref;
199 struct list_head node;
200};
201
202struct rbd_img_request;
203typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
204
205#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
206
207struct rbd_obj_request;
208typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
209
210enum obj_request_type {
211 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
212};
213
214enum obj_operation_type {
215 OBJ_OP_WRITE,
216 OBJ_OP_READ,
217 OBJ_OP_DISCARD,
218};
219
220enum obj_req_flags {
221 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
222 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
223 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
224 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
225};
226
227struct rbd_obj_request {
228 u64 object_no;
229 u64 offset; /* object start byte */
230 u64 length; /* bytes from offset */
231 unsigned long flags;
232
233 /*
234 * An object request associated with an image will have its
235 * img_data flag set; a standalone object request will not.
236 *
237 * A standalone object request will have which == BAD_WHICH
238 * and a null obj_request pointer.
239 *
240 * An object request initiated in support of a layered image
241 * object (to check for its existence before a write) will
242 * have which == BAD_WHICH and a non-null obj_request pointer.
243 *
244 * Finally, an object request for rbd image data will have
245 * which != BAD_WHICH, and will have a non-null img_request
246 * pointer. The value of which will be in the range
247 * 0..(img_request->obj_request_count-1).
248 */
249 union {
250 struct rbd_obj_request *obj_request; /* STAT op */
251 struct {
252 struct rbd_img_request *img_request;
253 u64 img_offset;
254 /* links for img_request->obj_requests list */
255 struct list_head links;
256 };
257 };
258 u32 which; /* posn image request list */
259
260 enum obj_request_type type;
261 union {
262 struct bio *bio_list;
263 struct {
264 struct page **pages;
265 u32 page_count;
266 };
267 };
268 struct page **copyup_pages;
269 u32 copyup_page_count;
270
271 struct ceph_osd_request *osd_req;
272
273 u64 xferred; /* bytes transferred */
274 int result;
275
276 rbd_obj_callback_t callback;
277 struct completion completion;
278
279 struct kref kref;
280};
281
282enum img_req_flags {
283 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
284 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
285 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
286 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
287};
288
289struct rbd_img_request {
290 struct rbd_device *rbd_dev;
291 u64 offset; /* starting image byte offset */
292 u64 length; /* byte count from offset */
293 unsigned long flags;
294 union {
295 u64 snap_id; /* for reads */
296 struct ceph_snap_context *snapc; /* for writes */
297 };
298 union {
299 struct request *rq; /* block request */
300 struct rbd_obj_request *obj_request; /* obj req initiator */
301 };
302 struct page **copyup_pages;
303 u32 copyup_page_count;
304 spinlock_t completion_lock;/* protects next_completion */
305 u32 next_completion;
306 rbd_img_callback_t callback;
307 u64 xferred;/* aggregate bytes transferred */
308 int result; /* first nonzero obj_request result */
309
310 u32 obj_request_count;
311 struct list_head obj_requests; /* rbd_obj_request structs */
312
313 struct kref kref;
314};
315
316#define for_each_obj_request(ireq, oreq) \
317 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
318#define for_each_obj_request_from(ireq, oreq) \
319 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
320#define for_each_obj_request_safe(ireq, oreq, n) \
321 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
322
323enum rbd_watch_state {
324 RBD_WATCH_STATE_UNREGISTERED,
325 RBD_WATCH_STATE_REGISTERED,
326 RBD_WATCH_STATE_ERROR,
327};
328
329enum rbd_lock_state {
330 RBD_LOCK_STATE_UNLOCKED,
331 RBD_LOCK_STATE_LOCKED,
332 RBD_LOCK_STATE_RELEASING,
333};
334
335/* WatchNotify::ClientId */
336struct rbd_client_id {
337 u64 gid;
338 u64 handle;
339};
340
341struct rbd_mapping {
342 u64 size;
343 u64 features;
344 bool read_only;
345};
346
347/*
348 * a single device
349 */
350struct rbd_device {
351 int dev_id; /* blkdev unique id */
352
353 int major; /* blkdev assigned major */
354 int minor;
355 struct gendisk *disk; /* blkdev's gendisk and rq */
356
357 u32 image_format; /* Either 1 or 2 */
358 struct rbd_client *rbd_client;
359
360 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
361
362 spinlock_t lock; /* queue, flags, open_count */
363
364 struct rbd_image_header header;
365 unsigned long flags; /* possibly lock protected */
366 struct rbd_spec *spec;
367 struct rbd_options *opts;
368 char *config_info; /* add{,_single_major} string */
369
370 struct ceph_object_id header_oid;
371 struct ceph_object_locator header_oloc;
372
373 struct ceph_file_layout layout; /* used for all rbd requests */
374
375 struct mutex watch_mutex;
376 enum rbd_watch_state watch_state;
377 struct ceph_osd_linger_request *watch_handle;
378 u64 watch_cookie;
379 struct delayed_work watch_dwork;
380
381 struct rw_semaphore lock_rwsem;
382 enum rbd_lock_state lock_state;
383 char lock_cookie[32];
384 struct rbd_client_id owner_cid;
385 struct work_struct acquired_lock_work;
386 struct work_struct released_lock_work;
387 struct delayed_work lock_dwork;
388 struct work_struct unlock_work;
389 wait_queue_head_t lock_waitq;
390
391 struct workqueue_struct *task_wq;
392
393 struct rbd_spec *parent_spec;
394 u64 parent_overlap;
395 atomic_t parent_ref;
396 struct rbd_device *parent;
397
398 /* Block layer tags. */
399 struct blk_mq_tag_set tag_set;
400
401 /* protects updating the header */
402 struct rw_semaphore header_rwsem;
403
404 struct rbd_mapping mapping;
405
406 struct list_head node;
407
408 /* sysfs related */
409 struct device dev;
410 unsigned long open_count; /* protected by lock */
411};
412
413/*
414 * Flag bits for rbd_dev->flags:
415 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
416 * by rbd_dev->lock
417 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
418 */
419enum rbd_dev_flags {
420 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
421 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
422 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
423};
424
425static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
426
427static LIST_HEAD(rbd_dev_list); /* devices */
428static DEFINE_SPINLOCK(rbd_dev_list_lock);
429
430static LIST_HEAD(rbd_client_list); /* clients */
431static DEFINE_SPINLOCK(rbd_client_list_lock);
432
433/* Slab caches for frequently-allocated structures */
434
435static struct kmem_cache *rbd_img_request_cache;
436static struct kmem_cache *rbd_obj_request_cache;
437
438static struct bio_set *rbd_bio_clone;
439
440static int rbd_major;
441static DEFINE_IDA(rbd_dev_id_ida);
442
443static struct workqueue_struct *rbd_wq;
444
445/*
446 * Default to false for now, as single-major requires >= 0.75 version of
447 * userspace rbd utility.
448 */
449static bool single_major = false;
450module_param(single_major, bool, S_IRUGO);
451MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
452
453static int rbd_img_request_submit(struct rbd_img_request *img_request);
454
455static ssize_t rbd_add(struct bus_type *bus, const char *buf,
456 size_t count);
457static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
458 size_t count);
459static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
460 size_t count);
461static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
462 size_t count);
463static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
464static void rbd_spec_put(struct rbd_spec *spec);
465
466static int rbd_dev_id_to_minor(int dev_id)
467{
468 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
469}
470
471static int minor_to_rbd_dev_id(int minor)
472{
473 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
474}
475
476static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
477{
478 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
479 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
480}
481
482static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
483{
484 bool is_lock_owner;
485
486 down_read(&rbd_dev->lock_rwsem);
487 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
488 up_read(&rbd_dev->lock_rwsem);
489 return is_lock_owner;
490}
491
492static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
493{
494 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
495}
496
497static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
498static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
499static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
500static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
501static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
502
503static struct attribute *rbd_bus_attrs[] = {
504 &bus_attr_add.attr,
505 &bus_attr_remove.attr,
506 &bus_attr_add_single_major.attr,
507 &bus_attr_remove_single_major.attr,
508 &bus_attr_supported_features.attr,
509 NULL,
510};
511
512static umode_t rbd_bus_is_visible(struct kobject *kobj,
513 struct attribute *attr, int index)
514{
515 if (!single_major &&
516 (attr == &bus_attr_add_single_major.attr ||
517 attr == &bus_attr_remove_single_major.attr))
518 return 0;
519
520 return attr->mode;
521}
522
523static const struct attribute_group rbd_bus_group = {
524 .attrs = rbd_bus_attrs,
525 .is_visible = rbd_bus_is_visible,
526};
527__ATTRIBUTE_GROUPS(rbd_bus);
528
529static struct bus_type rbd_bus_type = {
530 .name = "rbd",
531 .bus_groups = rbd_bus_groups,
532};
533
534static void rbd_root_dev_release(struct device *dev)
535{
536}
537
538static struct device rbd_root_dev = {
539 .init_name = "rbd",
540 .release = rbd_root_dev_release,
541};
542
543static __printf(2, 3)
544void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
545{
546 struct va_format vaf;
547 va_list args;
548
549 va_start(args, fmt);
550 vaf.fmt = fmt;
551 vaf.va = &args;
552
553 if (!rbd_dev)
554 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
555 else if (rbd_dev->disk)
556 printk(KERN_WARNING "%s: %s: %pV\n",
557 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
558 else if (rbd_dev->spec && rbd_dev->spec->image_name)
559 printk(KERN_WARNING "%s: image %s: %pV\n",
560 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
561 else if (rbd_dev->spec && rbd_dev->spec->image_id)
562 printk(KERN_WARNING "%s: id %s: %pV\n",
563 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
564 else /* punt */
565 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
566 RBD_DRV_NAME, rbd_dev, &vaf);
567 va_end(args);
568}
569
570#ifdef RBD_DEBUG
571#define rbd_assert(expr) \
572 if (unlikely(!(expr))) { \
573 printk(KERN_ERR "\nAssertion failure in %s() " \
574 "at line %d:\n\n" \
575 "\trbd_assert(%s);\n\n", \
576 __func__, __LINE__, #expr); \
577 BUG(); \
578 }
579#else /* !RBD_DEBUG */
580# define rbd_assert(expr) ((void) 0)
581#endif /* !RBD_DEBUG */
582
583static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
584static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
585static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
586static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
587
588static int rbd_dev_refresh(struct rbd_device *rbd_dev);
589static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
590static int rbd_dev_header_info(struct rbd_device *rbd_dev);
591static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
592static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
593 u64 snap_id);
594static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
595 u8 *order, u64 *snap_size);
596static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
597 u64 *snap_features);
598
599static int rbd_open(struct block_device *bdev, fmode_t mode)
600{
601 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602 bool removing = false;
603
604 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
605 return -EROFS;
606
607 spin_lock_irq(&rbd_dev->lock);
608 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
609 removing = true;
610 else
611 rbd_dev->open_count++;
612 spin_unlock_irq(&rbd_dev->lock);
613 if (removing)
614 return -ENOENT;
615
616 (void) get_device(&rbd_dev->dev);
617
618 return 0;
619}
620
621static void rbd_release(struct gendisk *disk, fmode_t mode)
622{
623 struct rbd_device *rbd_dev = disk->private_data;
624 unsigned long open_count_before;
625
626 spin_lock_irq(&rbd_dev->lock);
627 open_count_before = rbd_dev->open_count--;
628 spin_unlock_irq(&rbd_dev->lock);
629 rbd_assert(open_count_before > 0);
630
631 put_device(&rbd_dev->dev);
632}
633
634static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
635{
636 int ret = 0;
637 int val;
638 bool ro;
639 bool ro_changed = false;
640
641 /* get_user() may sleep, so call it before taking rbd_dev->lock */
642 if (get_user(val, (int __user *)(arg)))
643 return -EFAULT;
644
645 ro = val ? true : false;
646 /* Snapshot doesn't allow to write*/
647 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
648 return -EROFS;
649
650 spin_lock_irq(&rbd_dev->lock);
651 /* prevent others open this device */
652 if (rbd_dev->open_count > 1) {
653 ret = -EBUSY;
654 goto out;
655 }
656
657 if (rbd_dev->mapping.read_only != ro) {
658 rbd_dev->mapping.read_only = ro;
659 ro_changed = true;
660 }
661
662out:
663 spin_unlock_irq(&rbd_dev->lock);
664 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
665 if (ret == 0 && ro_changed)
666 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
667
668 return ret;
669}
670
671static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
672 unsigned int cmd, unsigned long arg)
673{
674 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
675 int ret = 0;
676
677 switch (cmd) {
678 case BLKROSET:
679 ret = rbd_ioctl_set_ro(rbd_dev, arg);
680 break;
681 default:
682 ret = -ENOTTY;
683 }
684
685 return ret;
686}
687
688#ifdef CONFIG_COMPAT
689static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
690 unsigned int cmd, unsigned long arg)
691{
692 return rbd_ioctl(bdev, mode, cmd, arg);
693}
694#endif /* CONFIG_COMPAT */
695
696static const struct block_device_operations rbd_bd_ops = {
697 .owner = THIS_MODULE,
698 .open = rbd_open,
699 .release = rbd_release,
700 .ioctl = rbd_ioctl,
701#ifdef CONFIG_COMPAT
702 .compat_ioctl = rbd_compat_ioctl,
703#endif
704};
705
706/*
707 * Initialize an rbd client instance. Success or not, this function
708 * consumes ceph_opts. Caller holds client_mutex.
709 */
710static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
711{
712 struct rbd_client *rbdc;
713 int ret = -ENOMEM;
714
715 dout("%s:\n", __func__);
716 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
717 if (!rbdc)
718 goto out_opt;
719
720 kref_init(&rbdc->kref);
721 INIT_LIST_HEAD(&rbdc->node);
722
723 rbdc->client = ceph_create_client(ceph_opts, rbdc);
724 if (IS_ERR(rbdc->client))
725 goto out_rbdc;
726 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
727
728 ret = ceph_open_session(rbdc->client);
729 if (ret < 0)
730 goto out_client;
731
732 spin_lock(&rbd_client_list_lock);
733 list_add_tail(&rbdc->node, &rbd_client_list);
734 spin_unlock(&rbd_client_list_lock);
735
736 dout("%s: rbdc %p\n", __func__, rbdc);
737
738 return rbdc;
739out_client:
740 ceph_destroy_client(rbdc->client);
741out_rbdc:
742 kfree(rbdc);
743out_opt:
744 if (ceph_opts)
745 ceph_destroy_options(ceph_opts);
746 dout("%s: error %d\n", __func__, ret);
747
748 return ERR_PTR(ret);
749}
750
751static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
752{
753 kref_get(&rbdc->kref);
754
755 return rbdc;
756}
757
758/*
759 * Find a ceph client with specific addr and configuration. If
760 * found, bump its reference count.
761 */
762static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
763{
764 struct rbd_client *client_node;
765 bool found = false;
766
767 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
768 return NULL;
769
770 spin_lock(&rbd_client_list_lock);
771 list_for_each_entry(client_node, &rbd_client_list, node) {
772 if (!ceph_compare_options(ceph_opts, client_node->client)) {
773 __rbd_get_client(client_node);
774
775 found = true;
776 break;
777 }
778 }
779 spin_unlock(&rbd_client_list_lock);
780
781 return found ? client_node : NULL;
782}
783
784/*
785 * (Per device) rbd map options
786 */
787enum {
788 Opt_queue_depth,
789 Opt_last_int,
790 /* int args above */
791 Opt_last_string,
792 /* string args above */
793 Opt_read_only,
794 Opt_read_write,
795 Opt_lock_on_read,
796 Opt_exclusive,
797 Opt_err
798};
799
800static match_table_t rbd_opts_tokens = {
801 {Opt_queue_depth, "queue_depth=%d"},
802 /* int args above */
803 /* string args above */
804 {Opt_read_only, "read_only"},
805 {Opt_read_only, "ro"}, /* Alternate spelling */
806 {Opt_read_write, "read_write"},
807 {Opt_read_write, "rw"}, /* Alternate spelling */
808 {Opt_lock_on_read, "lock_on_read"},
809 {Opt_exclusive, "exclusive"},
810 {Opt_err, NULL}
811};
812
813struct rbd_options {
814 int queue_depth;
815 bool read_only;
816 bool lock_on_read;
817 bool exclusive;
818};
819
820#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
821#define RBD_READ_ONLY_DEFAULT false
822#define RBD_LOCK_ON_READ_DEFAULT false
823#define RBD_EXCLUSIVE_DEFAULT false
824
825static int parse_rbd_opts_token(char *c, void *private)
826{
827 struct rbd_options *rbd_opts = private;
828 substring_t argstr[MAX_OPT_ARGS];
829 int token, intval, ret;
830
831 token = match_token(c, rbd_opts_tokens, argstr);
832 if (token < Opt_last_int) {
833 ret = match_int(&argstr[0], &intval);
834 if (ret < 0) {
835 pr_err("bad mount option arg (not int) at '%s'\n", c);
836 return ret;
837 }
838 dout("got int token %d val %d\n", token, intval);
839 } else if (token > Opt_last_int && token < Opt_last_string) {
840 dout("got string token %d val %s\n", token, argstr[0].from);
841 } else {
842 dout("got token %d\n", token);
843 }
844
845 switch (token) {
846 case Opt_queue_depth:
847 if (intval < 1) {
848 pr_err("queue_depth out of range\n");
849 return -EINVAL;
850 }
851 rbd_opts->queue_depth = intval;
852 break;
853 case Opt_read_only:
854 rbd_opts->read_only = true;
855 break;
856 case Opt_read_write:
857 rbd_opts->read_only = false;
858 break;
859 case Opt_lock_on_read:
860 rbd_opts->lock_on_read = true;
861 break;
862 case Opt_exclusive:
863 rbd_opts->exclusive = true;
864 break;
865 default:
866 /* libceph prints "bad option" msg */
867 return -EINVAL;
868 }
869
870 return 0;
871}
872
873static char* obj_op_name(enum obj_operation_type op_type)
874{
875 switch (op_type) {
876 case OBJ_OP_READ:
877 return "read";
878 case OBJ_OP_WRITE:
879 return "write";
880 case OBJ_OP_DISCARD:
881 return "discard";
882 default:
883 return "???";
884 }
885}
886
887/*
888 * Get a ceph client with specific addr and configuration, if one does
889 * not exist create it. Either way, ceph_opts is consumed by this
890 * function.
891 */
892static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
893{
894 struct rbd_client *rbdc;
895
896 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
897 rbdc = rbd_client_find(ceph_opts);
898 if (rbdc) /* using an existing client */
899 ceph_destroy_options(ceph_opts);
900 else
901 rbdc = rbd_client_create(ceph_opts);
902 mutex_unlock(&client_mutex);
903
904 return rbdc;
905}
906
907/*
908 * Destroy ceph client
909 *
910 * Caller must hold rbd_client_list_lock.
911 */
912static void rbd_client_release(struct kref *kref)
913{
914 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
915
916 dout("%s: rbdc %p\n", __func__, rbdc);
917 spin_lock(&rbd_client_list_lock);
918 list_del(&rbdc->node);
919 spin_unlock(&rbd_client_list_lock);
920
921 ceph_destroy_client(rbdc->client);
922 kfree(rbdc);
923}
924
925/*
926 * Drop reference to ceph client node. If it's not referenced anymore, release
927 * it.
928 */
929static void rbd_put_client(struct rbd_client *rbdc)
930{
931 if (rbdc)
932 kref_put(&rbdc->kref, rbd_client_release);
933}
934
935static bool rbd_image_format_valid(u32 image_format)
936{
937 return image_format == 1 || image_format == 2;
938}
939
940static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
941{
942 size_t size;
943 u32 snap_count;
944
945 /* The header has to start with the magic rbd header text */
946 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
947 return false;
948
949 /* The bio layer requires at least sector-sized I/O */
950
951 if (ondisk->options.order < SECTOR_SHIFT)
952 return false;
953
954 /* If we use u64 in a few spots we may be able to loosen this */
955
956 if (ondisk->options.order > 8 * sizeof (int) - 1)
957 return false;
958
959 /*
960 * The size of a snapshot header has to fit in a size_t, and
961 * that limits the number of snapshots.
962 */
963 snap_count = le32_to_cpu(ondisk->snap_count);
964 size = SIZE_MAX - sizeof (struct ceph_snap_context);
965 if (snap_count > size / sizeof (__le64))
966 return false;
967
968 /*
969 * Not only that, but the size of the entire the snapshot
970 * header must also be representable in a size_t.
971 */
972 size -= snap_count * sizeof (__le64);
973 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
974 return false;
975
976 return true;
977}
978
979/*
980 * returns the size of an object in the image
981 */
982static u32 rbd_obj_bytes(struct rbd_image_header *header)
983{
984 return 1U << header->obj_order;
985}
986
987static void rbd_init_layout(struct rbd_device *rbd_dev)
988{
989 if (rbd_dev->header.stripe_unit == 0 ||
990 rbd_dev->header.stripe_count == 0) {
991 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
992 rbd_dev->header.stripe_count = 1;
993 }
994
995 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
996 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
997 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
998 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
999 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1000 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1001}
1002
1003/*
1004 * Fill an rbd image header with information from the given format 1
1005 * on-disk header.
1006 */
1007static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1008 struct rbd_image_header_ondisk *ondisk)
1009{
1010 struct rbd_image_header *header = &rbd_dev->header;
1011 bool first_time = header->object_prefix == NULL;
1012 struct ceph_snap_context *snapc;
1013 char *object_prefix = NULL;
1014 char *snap_names = NULL;
1015 u64 *snap_sizes = NULL;
1016 u32 snap_count;
1017 int ret = -ENOMEM;
1018 u32 i;
1019
1020 /* Allocate this now to avoid having to handle failure below */
1021
1022 if (first_time) {
1023 object_prefix = kstrndup(ondisk->object_prefix,
1024 sizeof(ondisk->object_prefix),
1025 GFP_KERNEL);
1026 if (!object_prefix)
1027 return -ENOMEM;
1028 }
1029
1030 /* Allocate the snapshot context and fill it in */
1031
1032 snap_count = le32_to_cpu(ondisk->snap_count);
1033 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1034 if (!snapc)
1035 goto out_err;
1036 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1037 if (snap_count) {
1038 struct rbd_image_snap_ondisk *snaps;
1039 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1040
1041 /* We'll keep a copy of the snapshot names... */
1042
1043 if (snap_names_len > (u64)SIZE_MAX)
1044 goto out_2big;
1045 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1046 if (!snap_names)
1047 goto out_err;
1048
1049 /* ...as well as the array of their sizes. */
1050 snap_sizes = kmalloc_array(snap_count,
1051 sizeof(*header->snap_sizes),
1052 GFP_KERNEL);
1053 if (!snap_sizes)
1054 goto out_err;
1055
1056 /*
1057 * Copy the names, and fill in each snapshot's id
1058 * and size.
1059 *
1060 * Note that rbd_dev_v1_header_info() guarantees the
1061 * ondisk buffer we're working with has
1062 * snap_names_len bytes beyond the end of the
1063 * snapshot id array, this memcpy() is safe.
1064 */
1065 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1066 snaps = ondisk->snaps;
1067 for (i = 0; i < snap_count; i++) {
1068 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1069 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1070 }
1071 }
1072
1073 /* We won't fail any more, fill in the header */
1074
1075 if (first_time) {
1076 header->object_prefix = object_prefix;
1077 header->obj_order = ondisk->options.order;
1078 rbd_init_layout(rbd_dev);
1079 } else {
1080 ceph_put_snap_context(header->snapc);
1081 kfree(header->snap_names);
1082 kfree(header->snap_sizes);
1083 }
1084
1085 /* The remaining fields always get updated (when we refresh) */
1086
1087 header->image_size = le64_to_cpu(ondisk->image_size);
1088 header->snapc = snapc;
1089 header->snap_names = snap_names;
1090 header->snap_sizes = snap_sizes;
1091
1092 return 0;
1093out_2big:
1094 ret = -EIO;
1095out_err:
1096 kfree(snap_sizes);
1097 kfree(snap_names);
1098 ceph_put_snap_context(snapc);
1099 kfree(object_prefix);
1100
1101 return ret;
1102}
1103
1104static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1105{
1106 const char *snap_name;
1107
1108 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1109
1110 /* Skip over names until we find the one we are looking for */
1111
1112 snap_name = rbd_dev->header.snap_names;
1113 while (which--)
1114 snap_name += strlen(snap_name) + 1;
1115
1116 return kstrdup(snap_name, GFP_KERNEL);
1117}
1118
1119/*
1120 * Snapshot id comparison function for use with qsort()/bsearch().
1121 * Note that result is for snapshots in *descending* order.
1122 */
1123static int snapid_compare_reverse(const void *s1, const void *s2)
1124{
1125 u64 snap_id1 = *(u64 *)s1;
1126 u64 snap_id2 = *(u64 *)s2;
1127
1128 if (snap_id1 < snap_id2)
1129 return 1;
1130 return snap_id1 == snap_id2 ? 0 : -1;
1131}
1132
1133/*
1134 * Search a snapshot context to see if the given snapshot id is
1135 * present.
1136 *
1137 * Returns the position of the snapshot id in the array if it's found,
1138 * or BAD_SNAP_INDEX otherwise.
1139 *
1140 * Note: The snapshot array is in kept sorted (by the osd) in
1141 * reverse order, highest snapshot id first.
1142 */
1143static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1144{
1145 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1146 u64 *found;
1147
1148 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1149 sizeof (snap_id), snapid_compare_reverse);
1150
1151 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1152}
1153
1154static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1155 u64 snap_id)
1156{
1157 u32 which;
1158 const char *snap_name;
1159
1160 which = rbd_dev_snap_index(rbd_dev, snap_id);
1161 if (which == BAD_SNAP_INDEX)
1162 return ERR_PTR(-ENOENT);
1163
1164 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1165 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1166}
1167
1168static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1169{
1170 if (snap_id == CEPH_NOSNAP)
1171 return RBD_SNAP_HEAD_NAME;
1172
1173 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1174 if (rbd_dev->image_format == 1)
1175 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1176
1177 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1178}
1179
1180static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1181 u64 *snap_size)
1182{
1183 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1184 if (snap_id == CEPH_NOSNAP) {
1185 *snap_size = rbd_dev->header.image_size;
1186 } else if (rbd_dev->image_format == 1) {
1187 u32 which;
1188
1189 which = rbd_dev_snap_index(rbd_dev, snap_id);
1190 if (which == BAD_SNAP_INDEX)
1191 return -ENOENT;
1192
1193 *snap_size = rbd_dev->header.snap_sizes[which];
1194 } else {
1195 u64 size = 0;
1196 int ret;
1197
1198 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1199 if (ret)
1200 return ret;
1201
1202 *snap_size = size;
1203 }
1204 return 0;
1205}
1206
1207static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1208 u64 *snap_features)
1209{
1210 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1211 if (snap_id == CEPH_NOSNAP) {
1212 *snap_features = rbd_dev->header.features;
1213 } else if (rbd_dev->image_format == 1) {
1214 *snap_features = 0; /* No features for format 1 */
1215 } else {
1216 u64 features = 0;
1217 int ret;
1218
1219 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1220 if (ret)
1221 return ret;
1222
1223 *snap_features = features;
1224 }
1225 return 0;
1226}
1227
1228static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1229{
1230 u64 snap_id = rbd_dev->spec->snap_id;
1231 u64 size = 0;
1232 u64 features = 0;
1233 int ret;
1234
1235 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1236 if (ret)
1237 return ret;
1238 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1239 if (ret)
1240 return ret;
1241
1242 rbd_dev->mapping.size = size;
1243 rbd_dev->mapping.features = features;
1244
1245 return 0;
1246}
1247
1248static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1249{
1250 rbd_dev->mapping.size = 0;
1251 rbd_dev->mapping.features = 0;
1252}
1253
1254static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1255{
1256 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1257
1258 return offset & (segment_size - 1);
1259}
1260
1261static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1262 u64 offset, u64 length)
1263{
1264 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1265
1266 offset &= segment_size - 1;
1267
1268 rbd_assert(length <= U64_MAX - offset);
1269 if (offset + length > segment_size)
1270 length = segment_size - offset;
1271
1272 return length;
1273}
1274
1275/*
1276 * bio helpers
1277 */
1278
1279static void bio_chain_put(struct bio *chain)
1280{
1281 struct bio *tmp;
1282
1283 while (chain) {
1284 tmp = chain;
1285 chain = chain->bi_next;
1286 bio_put(tmp);
1287 }
1288}
1289
1290/*
1291 * zeros a bio chain, starting at specific offset
1292 */
1293static void zero_bio_chain(struct bio *chain, int start_ofs)
1294{
1295 struct bio_vec bv;
1296 struct bvec_iter iter;
1297 unsigned long flags;
1298 void *buf;
1299 int pos = 0;
1300
1301 while (chain) {
1302 bio_for_each_segment(bv, chain, iter) {
1303 if (pos + bv.bv_len > start_ofs) {
1304 int remainder = max(start_ofs - pos, 0);
1305 buf = bvec_kmap_irq(&bv, &flags);
1306 memset(buf + remainder, 0,
1307 bv.bv_len - remainder);
1308 flush_dcache_page(bv.bv_page);
1309 bvec_kunmap_irq(buf, &flags);
1310 }
1311 pos += bv.bv_len;
1312 }
1313
1314 chain = chain->bi_next;
1315 }
1316}
1317
1318/*
1319 * similar to zero_bio_chain(), zeros data defined by a page array,
1320 * starting at the given byte offset from the start of the array and
1321 * continuing up to the given end offset. The pages array is
1322 * assumed to be big enough to hold all bytes up to the end.
1323 */
1324static void zero_pages(struct page **pages, u64 offset, u64 end)
1325{
1326 struct page **page = &pages[offset >> PAGE_SHIFT];
1327
1328 rbd_assert(end > offset);
1329 rbd_assert(end - offset <= (u64)SIZE_MAX);
1330 while (offset < end) {
1331 size_t page_offset;
1332 size_t length;
1333 unsigned long flags;
1334 void *kaddr;
1335
1336 page_offset = offset & ~PAGE_MASK;
1337 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1338 local_irq_save(flags);
1339 kaddr = kmap_atomic(*page);
1340 memset(kaddr + page_offset, 0, length);
1341 flush_dcache_page(*page);
1342 kunmap_atomic(kaddr);
1343 local_irq_restore(flags);
1344
1345 offset += length;
1346 page++;
1347 }
1348}
1349
1350/*
1351 * Clone a portion of a bio, starting at the given byte offset
1352 * and continuing for the number of bytes indicated.
1353 */
1354static struct bio *bio_clone_range(struct bio *bio_src,
1355 unsigned int offset,
1356 unsigned int len,
1357 gfp_t gfpmask)
1358{
1359 struct bio *bio;
1360
1361 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1362 if (!bio)
1363 return NULL; /* ENOMEM */
1364
1365 bio_advance(bio, offset);
1366 bio->bi_iter.bi_size = len;
1367
1368 return bio;
1369}
1370
1371/*
1372 * Clone a portion of a bio chain, starting at the given byte offset
1373 * into the first bio in the source chain and continuing for the
1374 * number of bytes indicated. The result is another bio chain of
1375 * exactly the given length, or a null pointer on error.
1376 *
1377 * The bio_src and offset parameters are both in-out. On entry they
1378 * refer to the first source bio and the offset into that bio where
1379 * the start of data to be cloned is located.
1380 *
1381 * On return, bio_src is updated to refer to the bio in the source
1382 * chain that contains first un-cloned byte, and *offset will
1383 * contain the offset of that byte within that bio.
1384 */
1385static struct bio *bio_chain_clone_range(struct bio **bio_src,
1386 unsigned int *offset,
1387 unsigned int len,
1388 gfp_t gfpmask)
1389{
1390 struct bio *bi = *bio_src;
1391 unsigned int off = *offset;
1392 struct bio *chain = NULL;
1393 struct bio **end;
1394
1395 /* Build up a chain of clone bios up to the limit */
1396
1397 if (!bi || off >= bi->bi_iter.bi_size || !len)
1398 return NULL; /* Nothing to clone */
1399
1400 end = &chain;
1401 while (len) {
1402 unsigned int bi_size;
1403 struct bio *bio;
1404
1405 if (!bi) {
1406 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1407 goto out_err; /* EINVAL; ran out of bio's */
1408 }
1409 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1410 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1411 if (!bio)
1412 goto out_err; /* ENOMEM */
1413
1414 *end = bio;
1415 end = &bio->bi_next;
1416
1417 off += bi_size;
1418 if (off == bi->bi_iter.bi_size) {
1419 bi = bi->bi_next;
1420 off = 0;
1421 }
1422 len -= bi_size;
1423 }
1424 *bio_src = bi;
1425 *offset = off;
1426
1427 return chain;
1428out_err:
1429 bio_chain_put(chain);
1430
1431 return NULL;
1432}
1433
1434/*
1435 * The default/initial value for all object request flags is 0. For
1436 * each flag, once its value is set to 1 it is never reset to 0
1437 * again.
1438 */
1439static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1440{
1441 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1442 struct rbd_device *rbd_dev;
1443
1444 rbd_dev = obj_request->img_request->rbd_dev;
1445 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1446 obj_request);
1447 }
1448}
1449
1450static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1451{
1452 smp_mb();
1453 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1454}
1455
1456static void obj_request_done_set(struct rbd_obj_request *obj_request)
1457{
1458 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1459 struct rbd_device *rbd_dev = NULL;
1460
1461 if (obj_request_img_data_test(obj_request))
1462 rbd_dev = obj_request->img_request->rbd_dev;
1463 rbd_warn(rbd_dev, "obj_request %p already marked done",
1464 obj_request);
1465 }
1466}
1467
1468static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1469{
1470 smp_mb();
1471 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1472}
1473
1474/*
1475 * This sets the KNOWN flag after (possibly) setting the EXISTS
1476 * flag. The latter is set based on the "exists" value provided.
1477 *
1478 * Note that for our purposes once an object exists it never goes
1479 * away again. It's possible that the response from two existence
1480 * checks are separated by the creation of the target object, and
1481 * the first ("doesn't exist") response arrives *after* the second
1482 * ("does exist"). In that case we ignore the second one.
1483 */
1484static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1485 bool exists)
1486{
1487 if (exists)
1488 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1489 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1490 smp_mb();
1491}
1492
1493static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1494{
1495 smp_mb();
1496 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1497}
1498
1499static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1500{
1501 smp_mb();
1502 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1503}
1504
1505static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1506{
1507 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1508
1509 return obj_request->img_offset <
1510 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1511}
1512
1513static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1514{
1515 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1516 kref_read(&obj_request->kref));
1517 kref_get(&obj_request->kref);
1518}
1519
1520static void rbd_obj_request_destroy(struct kref *kref);
1521static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1522{
1523 rbd_assert(obj_request != NULL);
1524 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1525 kref_read(&obj_request->kref));
1526 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1527}
1528
1529static void rbd_img_request_get(struct rbd_img_request *img_request)
1530{
1531 dout("%s: img %p (was %d)\n", __func__, img_request,
1532 kref_read(&img_request->kref));
1533 kref_get(&img_request->kref);
1534}
1535
1536static bool img_request_child_test(struct rbd_img_request *img_request);
1537static void rbd_parent_request_destroy(struct kref *kref);
1538static void rbd_img_request_destroy(struct kref *kref);
1539static void rbd_img_request_put(struct rbd_img_request *img_request)
1540{
1541 rbd_assert(img_request != NULL);
1542 dout("%s: img %p (was %d)\n", __func__, img_request,
1543 kref_read(&img_request->kref));
1544 if (img_request_child_test(img_request))
1545 kref_put(&img_request->kref, rbd_parent_request_destroy);
1546 else
1547 kref_put(&img_request->kref, rbd_img_request_destroy);
1548}
1549
1550static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1551 struct rbd_obj_request *obj_request)
1552{
1553 rbd_assert(obj_request->img_request == NULL);
1554
1555 /* Image request now owns object's original reference */
1556 obj_request->img_request = img_request;
1557 obj_request->which = img_request->obj_request_count;
1558 rbd_assert(!obj_request_img_data_test(obj_request));
1559 obj_request_img_data_set(obj_request);
1560 rbd_assert(obj_request->which != BAD_WHICH);
1561 img_request->obj_request_count++;
1562 list_add_tail(&obj_request->links, &img_request->obj_requests);
1563 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1564 obj_request->which);
1565}
1566
1567static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1568 struct rbd_obj_request *obj_request)
1569{
1570 rbd_assert(obj_request->which != BAD_WHICH);
1571
1572 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1573 obj_request->which);
1574 list_del(&obj_request->links);
1575 rbd_assert(img_request->obj_request_count > 0);
1576 img_request->obj_request_count--;
1577 rbd_assert(obj_request->which == img_request->obj_request_count);
1578 obj_request->which = BAD_WHICH;
1579 rbd_assert(obj_request_img_data_test(obj_request));
1580 rbd_assert(obj_request->img_request == img_request);
1581 obj_request->img_request = NULL;
1582 obj_request->callback = NULL;
1583 rbd_obj_request_put(obj_request);
1584}
1585
1586static bool obj_request_type_valid(enum obj_request_type type)
1587{
1588 switch (type) {
1589 case OBJ_REQUEST_NODATA:
1590 case OBJ_REQUEST_BIO:
1591 case OBJ_REQUEST_PAGES:
1592 return true;
1593 default:
1594 return false;
1595 }
1596}
1597
1598static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1599
1600static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1601{
1602 struct ceph_osd_request *osd_req = obj_request->osd_req;
1603
1604 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1605 obj_request, obj_request->object_no, obj_request->offset,
1606 obj_request->length, osd_req);
1607 if (obj_request_img_data_test(obj_request)) {
1608 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1609 rbd_img_request_get(obj_request->img_request);
1610 }
1611 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1612}
1613
1614static void rbd_img_request_complete(struct rbd_img_request *img_request)
1615{
1616
1617 dout("%s: img %p\n", __func__, img_request);
1618
1619 /*
1620 * If no error occurred, compute the aggregate transfer
1621 * count for the image request. We could instead use
1622 * atomic64_cmpxchg() to update it as each object request
1623 * completes; not clear which way is better off hand.
1624 */
1625 if (!img_request->result) {
1626 struct rbd_obj_request *obj_request;
1627 u64 xferred = 0;
1628
1629 for_each_obj_request(img_request, obj_request)
1630 xferred += obj_request->xferred;
1631 img_request->xferred = xferred;
1632 }
1633
1634 if (img_request->callback)
1635 img_request->callback(img_request);
1636 else
1637 rbd_img_request_put(img_request);
1638}
1639
1640/*
1641 * The default/initial value for all image request flags is 0. Each
1642 * is conditionally set to 1 at image request initialization time
1643 * and currently never change thereafter.
1644 */
1645static void img_request_write_set(struct rbd_img_request *img_request)
1646{
1647 set_bit(IMG_REQ_WRITE, &img_request->flags);
1648 smp_mb();
1649}
1650
1651static bool img_request_write_test(struct rbd_img_request *img_request)
1652{
1653 smp_mb();
1654 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1655}
1656
1657/*
1658 * Set the discard flag when the img_request is an discard request
1659 */
1660static void img_request_discard_set(struct rbd_img_request *img_request)
1661{
1662 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1663 smp_mb();
1664}
1665
1666static bool img_request_discard_test(struct rbd_img_request *img_request)
1667{
1668 smp_mb();
1669 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1670}
1671
1672static void img_request_child_set(struct rbd_img_request *img_request)
1673{
1674 set_bit(IMG_REQ_CHILD, &img_request->flags);
1675 smp_mb();
1676}
1677
1678static void img_request_child_clear(struct rbd_img_request *img_request)
1679{
1680 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1681 smp_mb();
1682}
1683
1684static bool img_request_child_test(struct rbd_img_request *img_request)
1685{
1686 smp_mb();
1687 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1688}
1689
1690static void img_request_layered_set(struct rbd_img_request *img_request)
1691{
1692 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1693 smp_mb();
1694}
1695
1696static void img_request_layered_clear(struct rbd_img_request *img_request)
1697{
1698 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1699 smp_mb();
1700}
1701
1702static bool img_request_layered_test(struct rbd_img_request *img_request)
1703{
1704 smp_mb();
1705 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1706}
1707
1708static enum obj_operation_type
1709rbd_img_request_op_type(struct rbd_img_request *img_request)
1710{
1711 if (img_request_write_test(img_request))
1712 return OBJ_OP_WRITE;
1713 else if (img_request_discard_test(img_request))
1714 return OBJ_OP_DISCARD;
1715 else
1716 return OBJ_OP_READ;
1717}
1718
1719static void
1720rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1721{
1722 u64 xferred = obj_request->xferred;
1723 u64 length = obj_request->length;
1724
1725 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1726 obj_request, obj_request->img_request, obj_request->result,
1727 xferred, length);
1728 /*
1729 * ENOENT means a hole in the image. We zero-fill the entire
1730 * length of the request. A short read also implies zero-fill
1731 * to the end of the request. An error requires the whole
1732 * length of the request to be reported finished with an error
1733 * to the block layer. In each case we update the xferred
1734 * count to indicate the whole request was satisfied.
1735 */
1736 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1737 if (obj_request->result == -ENOENT) {
1738 if (obj_request->type == OBJ_REQUEST_BIO)
1739 zero_bio_chain(obj_request->bio_list, 0);
1740 else
1741 zero_pages(obj_request->pages, 0, length);
1742 obj_request->result = 0;
1743 } else if (xferred < length && !obj_request->result) {
1744 if (obj_request->type == OBJ_REQUEST_BIO)
1745 zero_bio_chain(obj_request->bio_list, xferred);
1746 else
1747 zero_pages(obj_request->pages, xferred, length);
1748 }
1749 obj_request->xferred = length;
1750 obj_request_done_set(obj_request);
1751}
1752
1753static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1754{
1755 dout("%s: obj %p cb %p\n", __func__, obj_request,
1756 obj_request->callback);
1757 if (obj_request->callback)
1758 obj_request->callback(obj_request);
1759 else
1760 complete_all(&obj_request->completion);
1761}
1762
1763static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1764{
1765 obj_request->result = err;
1766 obj_request->xferred = 0;
1767 /*
1768 * kludge - mirror rbd_obj_request_submit() to match a put in
1769 * rbd_img_obj_callback()
1770 */
1771 if (obj_request_img_data_test(obj_request)) {
1772 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1773 rbd_img_request_get(obj_request->img_request);
1774 }
1775 obj_request_done_set(obj_request);
1776 rbd_obj_request_complete(obj_request);
1777}
1778
1779static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1780{
1781 struct rbd_img_request *img_request = NULL;
1782 struct rbd_device *rbd_dev = NULL;
1783 bool layered = false;
1784
1785 if (obj_request_img_data_test(obj_request)) {
1786 img_request = obj_request->img_request;
1787 layered = img_request && img_request_layered_test(img_request);
1788 rbd_dev = img_request->rbd_dev;
1789 }
1790
1791 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1792 obj_request, img_request, obj_request->result,
1793 obj_request->xferred, obj_request->length);
1794 if (layered && obj_request->result == -ENOENT &&
1795 obj_request->img_offset < rbd_dev->parent_overlap)
1796 rbd_img_parent_read(obj_request);
1797 else if (img_request)
1798 rbd_img_obj_request_read_callback(obj_request);
1799 else
1800 obj_request_done_set(obj_request);
1801}
1802
1803static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1804{
1805 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1806 obj_request->result, obj_request->length);
1807 /*
1808 * There is no such thing as a successful short write. Set
1809 * it to our originally-requested length.
1810 */
1811 obj_request->xferred = obj_request->length;
1812 obj_request_done_set(obj_request);
1813}
1814
1815static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1816{
1817 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1818 obj_request->result, obj_request->length);
1819 /*
1820 * There is no such thing as a successful short discard. Set
1821 * it to our originally-requested length.
1822 */
1823 obj_request->xferred = obj_request->length;
1824 /* discarding a non-existent object is not a problem */
1825 if (obj_request->result == -ENOENT)
1826 obj_request->result = 0;
1827 obj_request_done_set(obj_request);
1828}
1829
1830/*
1831 * For a simple stat call there's nothing to do. We'll do more if
1832 * this is part of a write sequence for a layered image.
1833 */
1834static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1835{
1836 dout("%s: obj %p\n", __func__, obj_request);
1837 obj_request_done_set(obj_request);
1838}
1839
1840static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1841{
1842 dout("%s: obj %p\n", __func__, obj_request);
1843
1844 if (obj_request_img_data_test(obj_request))
1845 rbd_osd_copyup_callback(obj_request);
1846 else
1847 obj_request_done_set(obj_request);
1848}
1849
1850static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1851{
1852 struct rbd_obj_request *obj_request = osd_req->r_priv;
1853 u16 opcode;
1854
1855 dout("%s: osd_req %p\n", __func__, osd_req);
1856 rbd_assert(osd_req == obj_request->osd_req);
1857 if (obj_request_img_data_test(obj_request)) {
1858 rbd_assert(obj_request->img_request);
1859 rbd_assert(obj_request->which != BAD_WHICH);
1860 } else {
1861 rbd_assert(obj_request->which == BAD_WHICH);
1862 }
1863
1864 if (osd_req->r_result < 0)
1865 obj_request->result = osd_req->r_result;
1866
1867 /*
1868 * We support a 64-bit length, but ultimately it has to be
1869 * passed to the block layer, which just supports a 32-bit
1870 * length field.
1871 */
1872 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1873 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1874
1875 opcode = osd_req->r_ops[0].op;
1876 switch (opcode) {
1877 case CEPH_OSD_OP_READ:
1878 rbd_osd_read_callback(obj_request);
1879 break;
1880 case CEPH_OSD_OP_SETALLOCHINT:
1881 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1882 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1883 /* fall through */
1884 case CEPH_OSD_OP_WRITE:
1885 case CEPH_OSD_OP_WRITEFULL:
1886 rbd_osd_write_callback(obj_request);
1887 break;
1888 case CEPH_OSD_OP_STAT:
1889 rbd_osd_stat_callback(obj_request);
1890 break;
1891 case CEPH_OSD_OP_DELETE:
1892 case CEPH_OSD_OP_TRUNCATE:
1893 case CEPH_OSD_OP_ZERO:
1894 rbd_osd_discard_callback(obj_request);
1895 break;
1896 case CEPH_OSD_OP_CALL:
1897 rbd_osd_call_callback(obj_request);
1898 break;
1899 default:
1900 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1901 obj_request->object_no, opcode);
1902 break;
1903 }
1904
1905 if (obj_request_done_test(obj_request))
1906 rbd_obj_request_complete(obj_request);
1907}
1908
1909static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1910{
1911 struct ceph_osd_request *osd_req = obj_request->osd_req;
1912
1913 rbd_assert(obj_request_img_data_test(obj_request));
1914 osd_req->r_snapid = obj_request->img_request->snap_id;
1915}
1916
1917static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1918{
1919 struct ceph_osd_request *osd_req = obj_request->osd_req;
1920
1921 ktime_get_real_ts(&osd_req->r_mtime);
1922 osd_req->r_data_offset = obj_request->offset;
1923}
1924
1925static struct ceph_osd_request *
1926__rbd_osd_req_create(struct rbd_device *rbd_dev,
1927 struct ceph_snap_context *snapc,
1928 int num_ops, unsigned int flags,
1929 struct rbd_obj_request *obj_request)
1930{
1931 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1932 struct ceph_osd_request *req;
1933 const char *name_format = rbd_dev->image_format == 1 ?
1934 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1935
1936 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1937 if (!req)
1938 return NULL;
1939
1940 req->r_flags = flags;
1941 req->r_callback = rbd_osd_req_callback;
1942 req->r_priv = obj_request;
1943
1944 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1945 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1946 rbd_dev->header.object_prefix, obj_request->object_no))
1947 goto err_req;
1948
1949 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1950 goto err_req;
1951
1952 return req;
1953
1954err_req:
1955 ceph_osdc_put_request(req);
1956 return NULL;
1957}
1958
1959/*
1960 * Create an osd request. A read request has one osd op (read).
1961 * A write request has either one (watch) or two (hint+write) osd ops.
1962 * (All rbd data writes are prefixed with an allocation hint op, but
1963 * technically osd watch is a write request, hence this distinction.)
1964 */
1965static struct ceph_osd_request *rbd_osd_req_create(
1966 struct rbd_device *rbd_dev,
1967 enum obj_operation_type op_type,
1968 unsigned int num_ops,
1969 struct rbd_obj_request *obj_request)
1970{
1971 struct ceph_snap_context *snapc = NULL;
1972
1973 if (obj_request_img_data_test(obj_request) &&
1974 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1975 struct rbd_img_request *img_request = obj_request->img_request;
1976 if (op_type == OBJ_OP_WRITE) {
1977 rbd_assert(img_request_write_test(img_request));
1978 } else {
1979 rbd_assert(img_request_discard_test(img_request));
1980 }
1981 snapc = img_request->snapc;
1982 }
1983
1984 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1985
1986 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1987 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1988 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1989}
1990
1991/*
1992 * Create a copyup osd request based on the information in the object
1993 * request supplied. A copyup request has two or three osd ops, a
1994 * copyup method call, potentially a hint op, and a write or truncate
1995 * or zero op.
1996 */
1997static struct ceph_osd_request *
1998rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1999{
2000 struct rbd_img_request *img_request;
2001 int num_osd_ops = 3;
2002
2003 rbd_assert(obj_request_img_data_test(obj_request));
2004 img_request = obj_request->img_request;
2005 rbd_assert(img_request);
2006 rbd_assert(img_request_write_test(img_request) ||
2007 img_request_discard_test(img_request));
2008
2009 if (img_request_discard_test(img_request))
2010 num_osd_ops = 2;
2011
2012 return __rbd_osd_req_create(img_request->rbd_dev,
2013 img_request->snapc, num_osd_ops,
2014 CEPH_OSD_FLAG_WRITE, obj_request);
2015}
2016
2017static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2018{
2019 ceph_osdc_put_request(osd_req);
2020}
2021
2022static struct rbd_obj_request *
2023rbd_obj_request_create(enum obj_request_type type)
2024{
2025 struct rbd_obj_request *obj_request;
2026
2027 rbd_assert(obj_request_type_valid(type));
2028
2029 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2030 if (!obj_request)
2031 return NULL;
2032
2033 obj_request->which = BAD_WHICH;
2034 obj_request->type = type;
2035 INIT_LIST_HEAD(&obj_request->links);
2036 init_completion(&obj_request->completion);
2037 kref_init(&obj_request->kref);
2038
2039 dout("%s %p\n", __func__, obj_request);
2040 return obj_request;
2041}
2042
2043static void rbd_obj_request_destroy(struct kref *kref)
2044{
2045 struct rbd_obj_request *obj_request;
2046
2047 obj_request = container_of(kref, struct rbd_obj_request, kref);
2048
2049 dout("%s: obj %p\n", __func__, obj_request);
2050
2051 rbd_assert(obj_request->img_request == NULL);
2052 rbd_assert(obj_request->which == BAD_WHICH);
2053
2054 if (obj_request->osd_req)
2055 rbd_osd_req_destroy(obj_request->osd_req);
2056
2057 rbd_assert(obj_request_type_valid(obj_request->type));
2058 switch (obj_request->type) {
2059 case OBJ_REQUEST_NODATA:
2060 break; /* Nothing to do */
2061 case OBJ_REQUEST_BIO:
2062 if (obj_request->bio_list)
2063 bio_chain_put(obj_request->bio_list);
2064 break;
2065 case OBJ_REQUEST_PAGES:
2066 /* img_data requests don't own their page array */
2067 if (obj_request->pages &&
2068 !obj_request_img_data_test(obj_request))
2069 ceph_release_page_vector(obj_request->pages,
2070 obj_request->page_count);
2071 break;
2072 }
2073
2074 kmem_cache_free(rbd_obj_request_cache, obj_request);
2075}
2076
2077/* It's OK to call this for a device with no parent */
2078
2079static void rbd_spec_put(struct rbd_spec *spec);
2080static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2081{
2082 rbd_dev_remove_parent(rbd_dev);
2083 rbd_spec_put(rbd_dev->parent_spec);
2084 rbd_dev->parent_spec = NULL;
2085 rbd_dev->parent_overlap = 0;
2086}
2087
2088/*
2089 * Parent image reference counting is used to determine when an
2090 * image's parent fields can be safely torn down--after there are no
2091 * more in-flight requests to the parent image. When the last
2092 * reference is dropped, cleaning them up is safe.
2093 */
2094static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2095{
2096 int counter;
2097
2098 if (!rbd_dev->parent_spec)
2099 return;
2100
2101 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2102 if (counter > 0)
2103 return;
2104
2105 /* Last reference; clean up parent data structures */
2106
2107 if (!counter)
2108 rbd_dev_unparent(rbd_dev);
2109 else
2110 rbd_warn(rbd_dev, "parent reference underflow");
2111}
2112
2113/*
2114 * If an image has a non-zero parent overlap, get a reference to its
2115 * parent.
2116 *
2117 * Returns true if the rbd device has a parent with a non-zero
2118 * overlap and a reference for it was successfully taken, or
2119 * false otherwise.
2120 */
2121static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2122{
2123 int counter = 0;
2124
2125 if (!rbd_dev->parent_spec)
2126 return false;
2127
2128 down_read(&rbd_dev->header_rwsem);
2129 if (rbd_dev->parent_overlap)
2130 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2131 up_read(&rbd_dev->header_rwsem);
2132
2133 if (counter < 0)
2134 rbd_warn(rbd_dev, "parent reference overflow");
2135
2136 return counter > 0;
2137}
2138
2139/*
2140 * Caller is responsible for filling in the list of object requests
2141 * that comprises the image request, and the Linux request pointer
2142 * (if there is one).
2143 */
2144static struct rbd_img_request *rbd_img_request_create(
2145 struct rbd_device *rbd_dev,
2146 u64 offset, u64 length,
2147 enum obj_operation_type op_type,
2148 struct ceph_snap_context *snapc)
2149{
2150 struct rbd_img_request *img_request;
2151
2152 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2153 if (!img_request)
2154 return NULL;
2155
2156 img_request->rq = NULL;
2157 img_request->rbd_dev = rbd_dev;
2158 img_request->offset = offset;
2159 img_request->length = length;
2160 img_request->flags = 0;
2161 if (op_type == OBJ_OP_DISCARD) {
2162 img_request_discard_set(img_request);
2163 img_request->snapc = snapc;
2164 } else if (op_type == OBJ_OP_WRITE) {
2165 img_request_write_set(img_request);
2166 img_request->snapc = snapc;
2167 } else {
2168 img_request->snap_id = rbd_dev->spec->snap_id;
2169 }
2170 if (rbd_dev_parent_get(rbd_dev))
2171 img_request_layered_set(img_request);
2172 spin_lock_init(&img_request->completion_lock);
2173 img_request->next_completion = 0;
2174 img_request->callback = NULL;
2175 img_request->result = 0;
2176 img_request->obj_request_count = 0;
2177 INIT_LIST_HEAD(&img_request->obj_requests);
2178 kref_init(&img_request->kref);
2179
2180 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2181 obj_op_name(op_type), offset, length, img_request);
2182
2183 return img_request;
2184}
2185
2186static void rbd_img_request_destroy(struct kref *kref)
2187{
2188 struct rbd_img_request *img_request;
2189 struct rbd_obj_request *obj_request;
2190 struct rbd_obj_request *next_obj_request;
2191
2192 img_request = container_of(kref, struct rbd_img_request, kref);
2193
2194 dout("%s: img %p\n", __func__, img_request);
2195
2196 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2197 rbd_img_obj_request_del(img_request, obj_request);
2198 rbd_assert(img_request->obj_request_count == 0);
2199
2200 if (img_request_layered_test(img_request)) {
2201 img_request_layered_clear(img_request);
2202 rbd_dev_parent_put(img_request->rbd_dev);
2203 }
2204
2205 if (img_request_write_test(img_request) ||
2206 img_request_discard_test(img_request))
2207 ceph_put_snap_context(img_request->snapc);
2208
2209 kmem_cache_free(rbd_img_request_cache, img_request);
2210}
2211
2212static struct rbd_img_request *rbd_parent_request_create(
2213 struct rbd_obj_request *obj_request,
2214 u64 img_offset, u64 length)
2215{
2216 struct rbd_img_request *parent_request;
2217 struct rbd_device *rbd_dev;
2218
2219 rbd_assert(obj_request->img_request);
2220 rbd_dev = obj_request->img_request->rbd_dev;
2221
2222 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2223 length, OBJ_OP_READ, NULL);
2224 if (!parent_request)
2225 return NULL;
2226
2227 img_request_child_set(parent_request);
2228 rbd_obj_request_get(obj_request);
2229 parent_request->obj_request = obj_request;
2230
2231 return parent_request;
2232}
2233
2234static void rbd_parent_request_destroy(struct kref *kref)
2235{
2236 struct rbd_img_request *parent_request;
2237 struct rbd_obj_request *orig_request;
2238
2239 parent_request = container_of(kref, struct rbd_img_request, kref);
2240 orig_request = parent_request->obj_request;
2241
2242 parent_request->obj_request = NULL;
2243 rbd_obj_request_put(orig_request);
2244 img_request_child_clear(parent_request);
2245
2246 rbd_img_request_destroy(kref);
2247}
2248
2249static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2250{
2251 struct rbd_img_request *img_request;
2252 unsigned int xferred;
2253 int result;
2254 bool more;
2255
2256 rbd_assert(obj_request_img_data_test(obj_request));
2257 img_request = obj_request->img_request;
2258
2259 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2260 xferred = (unsigned int)obj_request->xferred;
2261 result = obj_request->result;
2262 if (result) {
2263 struct rbd_device *rbd_dev = img_request->rbd_dev;
2264 enum obj_operation_type op_type;
2265
2266 if (img_request_discard_test(img_request))
2267 op_type = OBJ_OP_DISCARD;
2268 else if (img_request_write_test(img_request))
2269 op_type = OBJ_OP_WRITE;
2270 else
2271 op_type = OBJ_OP_READ;
2272
2273 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2274 obj_op_name(op_type), obj_request->length,
2275 obj_request->img_offset, obj_request->offset);
2276 rbd_warn(rbd_dev, " result %d xferred %x",
2277 result, xferred);
2278 if (!img_request->result)
2279 img_request->result = result;
2280 /*
2281 * Need to end I/O on the entire obj_request worth of
2282 * bytes in case of error.
2283 */
2284 xferred = obj_request->length;
2285 }
2286
2287 if (img_request_child_test(img_request)) {
2288 rbd_assert(img_request->obj_request != NULL);
2289 more = obj_request->which < img_request->obj_request_count - 1;
2290 } else {
2291 blk_status_t status = errno_to_blk_status(result);
2292
2293 rbd_assert(img_request->rq != NULL);
2294
2295 more = blk_update_request(img_request->rq, status, xferred);
2296 if (!more)
2297 __blk_mq_end_request(img_request->rq, status);
2298 }
2299
2300 return more;
2301}
2302
2303static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2304{
2305 struct rbd_img_request *img_request;
2306 u32 which = obj_request->which;
2307 bool more = true;
2308
2309 rbd_assert(obj_request_img_data_test(obj_request));
2310 img_request = obj_request->img_request;
2311
2312 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2313 rbd_assert(img_request != NULL);
2314 rbd_assert(img_request->obj_request_count > 0);
2315 rbd_assert(which != BAD_WHICH);
2316 rbd_assert(which < img_request->obj_request_count);
2317
2318 spin_lock_irq(&img_request->completion_lock);
2319 if (which != img_request->next_completion)
2320 goto out;
2321
2322 for_each_obj_request_from(img_request, obj_request) {
2323 rbd_assert(more);
2324 rbd_assert(which < img_request->obj_request_count);
2325
2326 if (!obj_request_done_test(obj_request))
2327 break;
2328 more = rbd_img_obj_end_request(obj_request);
2329 which++;
2330 }
2331
2332 rbd_assert(more ^ (which == img_request->obj_request_count));
2333 img_request->next_completion = which;
2334out:
2335 spin_unlock_irq(&img_request->completion_lock);
2336 rbd_img_request_put(img_request);
2337
2338 if (!more)
2339 rbd_img_request_complete(img_request);
2340}
2341
2342/*
2343 * Add individual osd ops to the given ceph_osd_request and prepare
2344 * them for submission. num_ops is the current number of
2345 * osd operations already to the object request.
2346 */
2347static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2348 struct ceph_osd_request *osd_request,
2349 enum obj_operation_type op_type,
2350 unsigned int num_ops)
2351{
2352 struct rbd_img_request *img_request = obj_request->img_request;
2353 struct rbd_device *rbd_dev = img_request->rbd_dev;
2354 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2355 u64 offset = obj_request->offset;
2356 u64 length = obj_request->length;
2357 u64 img_end;
2358 u16 opcode;
2359
2360 if (op_type == OBJ_OP_DISCARD) {
2361 if (!offset && length == object_size &&
2362 (!img_request_layered_test(img_request) ||
2363 !obj_request_overlaps_parent(obj_request))) {
2364 opcode = CEPH_OSD_OP_DELETE;
2365 } else if ((offset + length == object_size)) {
2366 opcode = CEPH_OSD_OP_TRUNCATE;
2367 } else {
2368 down_read(&rbd_dev->header_rwsem);
2369 img_end = rbd_dev->header.image_size;
2370 up_read(&rbd_dev->header_rwsem);
2371
2372 if (obj_request->img_offset + length == img_end)
2373 opcode = CEPH_OSD_OP_TRUNCATE;
2374 else
2375 opcode = CEPH_OSD_OP_ZERO;
2376 }
2377 } else if (op_type == OBJ_OP_WRITE) {
2378 if (!offset && length == object_size)
2379 opcode = CEPH_OSD_OP_WRITEFULL;
2380 else
2381 opcode = CEPH_OSD_OP_WRITE;
2382 osd_req_op_alloc_hint_init(osd_request, num_ops,
2383 object_size, object_size);
2384 num_ops++;
2385 } else {
2386 opcode = CEPH_OSD_OP_READ;
2387 }
2388
2389 if (opcode == CEPH_OSD_OP_DELETE)
2390 osd_req_op_init(osd_request, num_ops, opcode, 0);
2391 else
2392 osd_req_op_extent_init(osd_request, num_ops, opcode,
2393 offset, length, 0, 0);
2394
2395 if (obj_request->type == OBJ_REQUEST_BIO)
2396 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2397 obj_request->bio_list, length);
2398 else if (obj_request->type == OBJ_REQUEST_PAGES)
2399 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2400 obj_request->pages, length,
2401 offset & ~PAGE_MASK, false, false);
2402
2403 /* Discards are also writes */
2404 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2405 rbd_osd_req_format_write(obj_request);
2406 else
2407 rbd_osd_req_format_read(obj_request);
2408}
2409
2410/*
2411 * Split up an image request into one or more object requests, each
2412 * to a different object. The "type" parameter indicates whether
2413 * "data_desc" is the pointer to the head of a list of bio
2414 * structures, or the base of a page array. In either case this
2415 * function assumes data_desc describes memory sufficient to hold
2416 * all data described by the image request.
2417 */
2418static int rbd_img_request_fill(struct rbd_img_request *img_request,
2419 enum obj_request_type type,
2420 void *data_desc)
2421{
2422 struct rbd_device *rbd_dev = img_request->rbd_dev;
2423 struct rbd_obj_request *obj_request = NULL;
2424 struct rbd_obj_request *next_obj_request;
2425 struct bio *bio_list = NULL;
2426 unsigned int bio_offset = 0;
2427 struct page **pages = NULL;
2428 enum obj_operation_type op_type;
2429 u64 img_offset;
2430 u64 resid;
2431
2432 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2433 (int)type, data_desc);
2434
2435 img_offset = img_request->offset;
2436 resid = img_request->length;
2437 rbd_assert(resid > 0);
2438 op_type = rbd_img_request_op_type(img_request);
2439
2440 if (type == OBJ_REQUEST_BIO) {
2441 bio_list = data_desc;
2442 rbd_assert(img_offset ==
2443 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2444 } else if (type == OBJ_REQUEST_PAGES) {
2445 pages = data_desc;
2446 }
2447
2448 while (resid) {
2449 struct ceph_osd_request *osd_req;
2450 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2451 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2452 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2453
2454 obj_request = rbd_obj_request_create(type);
2455 if (!obj_request)
2456 goto out_unwind;
2457
2458 obj_request->object_no = object_no;
2459 obj_request->offset = offset;
2460 obj_request->length = length;
2461
2462 /*
2463 * set obj_request->img_request before creating the
2464 * osd_request so that it gets the right snapc
2465 */
2466 rbd_img_obj_request_add(img_request, obj_request);
2467
2468 if (type == OBJ_REQUEST_BIO) {
2469 unsigned int clone_size;
2470
2471 rbd_assert(length <= (u64)UINT_MAX);
2472 clone_size = (unsigned int)length;
2473 obj_request->bio_list =
2474 bio_chain_clone_range(&bio_list,
2475 &bio_offset,
2476 clone_size,
2477 GFP_NOIO);
2478 if (!obj_request->bio_list)
2479 goto out_unwind;
2480 } else if (type == OBJ_REQUEST_PAGES) {
2481 unsigned int page_count;
2482
2483 obj_request->pages = pages;
2484 page_count = (u32)calc_pages_for(offset, length);
2485 obj_request->page_count = page_count;
2486 if ((offset + length) & ~PAGE_MASK)
2487 page_count--; /* more on last page */
2488 pages += page_count;
2489 }
2490
2491 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2492 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2493 obj_request);
2494 if (!osd_req)
2495 goto out_unwind;
2496
2497 obj_request->osd_req = osd_req;
2498 obj_request->callback = rbd_img_obj_callback;
2499 obj_request->img_offset = img_offset;
2500
2501 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2502
2503 img_offset += length;
2504 resid -= length;
2505 }
2506
2507 return 0;
2508
2509out_unwind:
2510 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2511 rbd_img_obj_request_del(img_request, obj_request);
2512
2513 return -ENOMEM;
2514}
2515
2516static void
2517rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2518{
2519 struct rbd_img_request *img_request;
2520 struct rbd_device *rbd_dev;
2521 struct page **pages;
2522 u32 page_count;
2523
2524 dout("%s: obj %p\n", __func__, obj_request);
2525
2526 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2527 obj_request->type == OBJ_REQUEST_NODATA);
2528 rbd_assert(obj_request_img_data_test(obj_request));
2529 img_request = obj_request->img_request;
2530 rbd_assert(img_request);
2531
2532 rbd_dev = img_request->rbd_dev;
2533 rbd_assert(rbd_dev);
2534
2535 pages = obj_request->copyup_pages;
2536 rbd_assert(pages != NULL);
2537 obj_request->copyup_pages = NULL;
2538 page_count = obj_request->copyup_page_count;
2539 rbd_assert(page_count);
2540 obj_request->copyup_page_count = 0;
2541 ceph_release_page_vector(pages, page_count);
2542
2543 /*
2544 * We want the transfer count to reflect the size of the
2545 * original write request. There is no such thing as a
2546 * successful short write, so if the request was successful
2547 * we can just set it to the originally-requested length.
2548 */
2549 if (!obj_request->result)
2550 obj_request->xferred = obj_request->length;
2551
2552 obj_request_done_set(obj_request);
2553}
2554
2555static void
2556rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2557{
2558 struct rbd_obj_request *orig_request;
2559 struct ceph_osd_request *osd_req;
2560 struct rbd_device *rbd_dev;
2561 struct page **pages;
2562 enum obj_operation_type op_type;
2563 u32 page_count;
2564 int img_result;
2565 u64 parent_length;
2566
2567 rbd_assert(img_request_child_test(img_request));
2568
2569 /* First get what we need from the image request */
2570
2571 pages = img_request->copyup_pages;
2572 rbd_assert(pages != NULL);
2573 img_request->copyup_pages = NULL;
2574 page_count = img_request->copyup_page_count;
2575 rbd_assert(page_count);
2576 img_request->copyup_page_count = 0;
2577
2578 orig_request = img_request->obj_request;
2579 rbd_assert(orig_request != NULL);
2580 rbd_assert(obj_request_type_valid(orig_request->type));
2581 img_result = img_request->result;
2582 parent_length = img_request->length;
2583 rbd_assert(img_result || parent_length == img_request->xferred);
2584 rbd_img_request_put(img_request);
2585
2586 rbd_assert(orig_request->img_request);
2587 rbd_dev = orig_request->img_request->rbd_dev;
2588 rbd_assert(rbd_dev);
2589
2590 /*
2591 * If the overlap has become 0 (most likely because the
2592 * image has been flattened) we need to free the pages
2593 * and re-submit the original write request.
2594 */
2595 if (!rbd_dev->parent_overlap) {
2596 ceph_release_page_vector(pages, page_count);
2597 rbd_obj_request_submit(orig_request);
2598 return;
2599 }
2600
2601 if (img_result)
2602 goto out_err;
2603
2604 /*
2605 * The original osd request is of no use to use any more.
2606 * We need a new one that can hold the three ops in a copyup
2607 * request. Allocate the new copyup osd request for the
2608 * original request, and release the old one.
2609 */
2610 img_result = -ENOMEM;
2611 osd_req = rbd_osd_req_create_copyup(orig_request);
2612 if (!osd_req)
2613 goto out_err;
2614 rbd_osd_req_destroy(orig_request->osd_req);
2615 orig_request->osd_req = osd_req;
2616 orig_request->copyup_pages = pages;
2617 orig_request->copyup_page_count = page_count;
2618
2619 /* Initialize the copyup op */
2620
2621 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2622 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2623 false, false);
2624
2625 /* Add the other op(s) */
2626
2627 op_type = rbd_img_request_op_type(orig_request->img_request);
2628 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2629
2630 /* All set, send it off. */
2631
2632 rbd_obj_request_submit(orig_request);
2633 return;
2634
2635out_err:
2636 ceph_release_page_vector(pages, page_count);
2637 rbd_obj_request_error(orig_request, img_result);
2638}
2639
2640/*
2641 * Read from the parent image the range of data that covers the
2642 * entire target of the given object request. This is used for
2643 * satisfying a layered image write request when the target of an
2644 * object request from the image request does not exist.
2645 *
2646 * A page array big enough to hold the returned data is allocated
2647 * and supplied to rbd_img_request_fill() as the "data descriptor."
2648 * When the read completes, this page array will be transferred to
2649 * the original object request for the copyup operation.
2650 *
2651 * If an error occurs, it is recorded as the result of the original
2652 * object request in rbd_img_obj_exists_callback().
2653 */
2654static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2655{
2656 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2657 struct rbd_img_request *parent_request = NULL;
2658 u64 img_offset;
2659 u64 length;
2660 struct page **pages = NULL;
2661 u32 page_count;
2662 int result;
2663
2664 rbd_assert(rbd_dev->parent != NULL);
2665
2666 /*
2667 * Determine the byte range covered by the object in the
2668 * child image to which the original request was to be sent.
2669 */
2670 img_offset = obj_request->img_offset - obj_request->offset;
2671 length = rbd_obj_bytes(&rbd_dev->header);
2672
2673 /*
2674 * There is no defined parent data beyond the parent
2675 * overlap, so limit what we read at that boundary if
2676 * necessary.
2677 */
2678 if (img_offset + length > rbd_dev->parent_overlap) {
2679 rbd_assert(img_offset < rbd_dev->parent_overlap);
2680 length = rbd_dev->parent_overlap - img_offset;
2681 }
2682
2683 /*
2684 * Allocate a page array big enough to receive the data read
2685 * from the parent.
2686 */
2687 page_count = (u32)calc_pages_for(0, length);
2688 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2689 if (IS_ERR(pages)) {
2690 result = PTR_ERR(pages);
2691 pages = NULL;
2692 goto out_err;
2693 }
2694
2695 result = -ENOMEM;
2696 parent_request = rbd_parent_request_create(obj_request,
2697 img_offset, length);
2698 if (!parent_request)
2699 goto out_err;
2700
2701 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2702 if (result)
2703 goto out_err;
2704
2705 parent_request->copyup_pages = pages;
2706 parent_request->copyup_page_count = page_count;
2707 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2708
2709 result = rbd_img_request_submit(parent_request);
2710 if (!result)
2711 return 0;
2712
2713 parent_request->copyup_pages = NULL;
2714 parent_request->copyup_page_count = 0;
2715 parent_request->obj_request = NULL;
2716 rbd_obj_request_put(obj_request);
2717out_err:
2718 if (pages)
2719 ceph_release_page_vector(pages, page_count);
2720 if (parent_request)
2721 rbd_img_request_put(parent_request);
2722 return result;
2723}
2724
2725static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2726{
2727 struct rbd_obj_request *orig_request;
2728 struct rbd_device *rbd_dev;
2729 int result;
2730
2731 rbd_assert(!obj_request_img_data_test(obj_request));
2732
2733 /*
2734 * All we need from the object request is the original
2735 * request and the result of the STAT op. Grab those, then
2736 * we're done with the request.
2737 */
2738 orig_request = obj_request->obj_request;
2739 obj_request->obj_request = NULL;
2740 rbd_obj_request_put(orig_request);
2741 rbd_assert(orig_request);
2742 rbd_assert(orig_request->img_request);
2743
2744 result = obj_request->result;
2745 obj_request->result = 0;
2746
2747 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2748 obj_request, orig_request, result,
2749 obj_request->xferred, obj_request->length);
2750 rbd_obj_request_put(obj_request);
2751
2752 /*
2753 * If the overlap has become 0 (most likely because the
2754 * image has been flattened) we need to re-submit the
2755 * original request.
2756 */
2757 rbd_dev = orig_request->img_request->rbd_dev;
2758 if (!rbd_dev->parent_overlap) {
2759 rbd_obj_request_submit(orig_request);
2760 return;
2761 }
2762
2763 /*
2764 * Our only purpose here is to determine whether the object
2765 * exists, and we don't want to treat the non-existence as
2766 * an error. If something else comes back, transfer the
2767 * error to the original request and complete it now.
2768 */
2769 if (!result) {
2770 obj_request_existence_set(orig_request, true);
2771 } else if (result == -ENOENT) {
2772 obj_request_existence_set(orig_request, false);
2773 } else {
2774 goto fail_orig_request;
2775 }
2776
2777 /*
2778 * Resubmit the original request now that we have recorded
2779 * whether the target object exists.
2780 */
2781 result = rbd_img_obj_request_submit(orig_request);
2782 if (result)
2783 goto fail_orig_request;
2784
2785 return;
2786
2787fail_orig_request:
2788 rbd_obj_request_error(orig_request, result);
2789}
2790
2791static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2792{
2793 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2794 struct rbd_obj_request *stat_request;
2795 struct page **pages;
2796 u32 page_count;
2797 size_t size;
2798 int ret;
2799
2800 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2801 if (!stat_request)
2802 return -ENOMEM;
2803
2804 stat_request->object_no = obj_request->object_no;
2805
2806 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2807 stat_request);
2808 if (!stat_request->osd_req) {
2809 ret = -ENOMEM;
2810 goto fail_stat_request;
2811 }
2812
2813 /*
2814 * The response data for a STAT call consists of:
2815 * le64 length;
2816 * struct {
2817 * le32 tv_sec;
2818 * le32 tv_nsec;
2819 * } mtime;
2820 */
2821 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2822 page_count = (u32)calc_pages_for(0, size);
2823 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2824 if (IS_ERR(pages)) {
2825 ret = PTR_ERR(pages);
2826 goto fail_stat_request;
2827 }
2828
2829 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2830 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2831 false, false);
2832
2833 rbd_obj_request_get(obj_request);
2834 stat_request->obj_request = obj_request;
2835 stat_request->pages = pages;
2836 stat_request->page_count = page_count;
2837 stat_request->callback = rbd_img_obj_exists_callback;
2838
2839 rbd_obj_request_submit(stat_request);
2840 return 0;
2841
2842fail_stat_request:
2843 rbd_obj_request_put(stat_request);
2844 return ret;
2845}
2846
2847static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2848{
2849 struct rbd_img_request *img_request = obj_request->img_request;
2850 struct rbd_device *rbd_dev = img_request->rbd_dev;
2851
2852 /* Reads */
2853 if (!img_request_write_test(img_request) &&
2854 !img_request_discard_test(img_request))
2855 return true;
2856
2857 /* Non-layered writes */
2858 if (!img_request_layered_test(img_request))
2859 return true;
2860
2861 /*
2862 * Layered writes outside of the parent overlap range don't
2863 * share any data with the parent.
2864 */
2865 if (!obj_request_overlaps_parent(obj_request))
2866 return true;
2867
2868 /*
2869 * Entire-object layered writes - we will overwrite whatever
2870 * parent data there is anyway.
2871 */
2872 if (!obj_request->offset &&
2873 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2874 return true;
2875
2876 /*
2877 * If the object is known to already exist, its parent data has
2878 * already been copied.
2879 */
2880 if (obj_request_known_test(obj_request) &&
2881 obj_request_exists_test(obj_request))
2882 return true;
2883
2884 return false;
2885}
2886
2887static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2888{
2889 rbd_assert(obj_request_img_data_test(obj_request));
2890 rbd_assert(obj_request_type_valid(obj_request->type));
2891 rbd_assert(obj_request->img_request);
2892
2893 if (img_obj_request_simple(obj_request)) {
2894 rbd_obj_request_submit(obj_request);
2895 return 0;
2896 }
2897
2898 /*
2899 * It's a layered write. The target object might exist but
2900 * we may not know that yet. If we know it doesn't exist,
2901 * start by reading the data for the full target object from
2902 * the parent so we can use it for a copyup to the target.
2903 */
2904 if (obj_request_known_test(obj_request))
2905 return rbd_img_obj_parent_read_full(obj_request);
2906
2907 /* We don't know whether the target exists. Go find out. */
2908
2909 return rbd_img_obj_exists_submit(obj_request);
2910}
2911
2912static int rbd_img_request_submit(struct rbd_img_request *img_request)
2913{
2914 struct rbd_obj_request *obj_request;
2915 struct rbd_obj_request *next_obj_request;
2916 int ret = 0;
2917
2918 dout("%s: img %p\n", __func__, img_request);
2919
2920 rbd_img_request_get(img_request);
2921 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2922 ret = rbd_img_obj_request_submit(obj_request);
2923 if (ret)
2924 goto out_put_ireq;
2925 }
2926
2927out_put_ireq:
2928 rbd_img_request_put(img_request);
2929 return ret;
2930}
2931
2932static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2933{
2934 struct rbd_obj_request *obj_request;
2935 struct rbd_device *rbd_dev;
2936 u64 obj_end;
2937 u64 img_xferred;
2938 int img_result;
2939
2940 rbd_assert(img_request_child_test(img_request));
2941
2942 /* First get what we need from the image request and release it */
2943
2944 obj_request = img_request->obj_request;
2945 img_xferred = img_request->xferred;
2946 img_result = img_request->result;
2947 rbd_img_request_put(img_request);
2948
2949 /*
2950 * If the overlap has become 0 (most likely because the
2951 * image has been flattened) we need to re-submit the
2952 * original request.
2953 */
2954 rbd_assert(obj_request);
2955 rbd_assert(obj_request->img_request);
2956 rbd_dev = obj_request->img_request->rbd_dev;
2957 if (!rbd_dev->parent_overlap) {
2958 rbd_obj_request_submit(obj_request);
2959 return;
2960 }
2961
2962 obj_request->result = img_result;
2963 if (obj_request->result)
2964 goto out;
2965
2966 /*
2967 * We need to zero anything beyond the parent overlap
2968 * boundary. Since rbd_img_obj_request_read_callback()
2969 * will zero anything beyond the end of a short read, an
2970 * easy way to do this is to pretend the data from the
2971 * parent came up short--ending at the overlap boundary.
2972 */
2973 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2974 obj_end = obj_request->img_offset + obj_request->length;
2975 if (obj_end > rbd_dev->parent_overlap) {
2976 u64 xferred = 0;
2977
2978 if (obj_request->img_offset < rbd_dev->parent_overlap)
2979 xferred = rbd_dev->parent_overlap -
2980 obj_request->img_offset;
2981
2982 obj_request->xferred = min(img_xferred, xferred);
2983 } else {
2984 obj_request->xferred = img_xferred;
2985 }
2986out:
2987 rbd_img_obj_request_read_callback(obj_request);
2988 rbd_obj_request_complete(obj_request);
2989}
2990
2991static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2992{
2993 struct rbd_img_request *img_request;
2994 int result;
2995
2996 rbd_assert(obj_request_img_data_test(obj_request));
2997 rbd_assert(obj_request->img_request != NULL);
2998 rbd_assert(obj_request->result == (s32) -ENOENT);
2999 rbd_assert(obj_request_type_valid(obj_request->type));
3000
3001 /* rbd_read_finish(obj_request, obj_request->length); */
3002 img_request = rbd_parent_request_create(obj_request,
3003 obj_request->img_offset,
3004 obj_request->length);
3005 result = -ENOMEM;
3006 if (!img_request)
3007 goto out_err;
3008
3009 if (obj_request->type == OBJ_REQUEST_BIO)
3010 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3011 obj_request->bio_list);
3012 else
3013 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3014 obj_request->pages);
3015 if (result)
3016 goto out_err;
3017
3018 img_request->callback = rbd_img_parent_read_callback;
3019 result = rbd_img_request_submit(img_request);
3020 if (result)
3021 goto out_err;
3022
3023 return;
3024out_err:
3025 if (img_request)
3026 rbd_img_request_put(img_request);
3027 obj_request->result = result;
3028 obj_request->xferred = 0;
3029 obj_request_done_set(obj_request);
3030}
3031
3032static const struct rbd_client_id rbd_empty_cid;
3033
3034static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3035 const struct rbd_client_id *rhs)
3036{
3037 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3038}
3039
3040static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3041{
3042 struct rbd_client_id cid;
3043
3044 mutex_lock(&rbd_dev->watch_mutex);
3045 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3046 cid.handle = rbd_dev->watch_cookie;
3047 mutex_unlock(&rbd_dev->watch_mutex);
3048 return cid;
3049}
3050
3051/*
3052 * lock_rwsem must be held for write
3053 */
3054static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3055 const struct rbd_client_id *cid)
3056{
3057 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3058 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3059 cid->gid, cid->handle);
3060 rbd_dev->owner_cid = *cid; /* struct */
3061}
3062
3063static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3064{
3065 mutex_lock(&rbd_dev->watch_mutex);
3066 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3067 mutex_unlock(&rbd_dev->watch_mutex);
3068}
3069
3070static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3071{
3072 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3073
3074 strcpy(rbd_dev->lock_cookie, cookie);
3075 rbd_set_owner_cid(rbd_dev, &cid);
3076 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3077}
3078
3079/*
3080 * lock_rwsem must be held for write
3081 */
3082static int rbd_lock(struct rbd_device *rbd_dev)
3083{
3084 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3085 char cookie[32];
3086 int ret;
3087
3088 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3089 rbd_dev->lock_cookie[0] != '\0');
3090
3091 format_lock_cookie(rbd_dev, cookie);
3092 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3093 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3094 RBD_LOCK_TAG, "", 0);
3095 if (ret)
3096 return ret;
3097
3098 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3099 __rbd_lock(rbd_dev, cookie);
3100 return 0;
3101}
3102
3103/*
3104 * lock_rwsem must be held for write
3105 */
3106static void rbd_unlock(struct rbd_device *rbd_dev)
3107{
3108 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3109 int ret;
3110
3111 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3112 rbd_dev->lock_cookie[0] == '\0');
3113
3114 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3115 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3116 if (ret && ret != -ENOENT)
3117 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3118
3119 /* treat errors as the image is unlocked */
3120 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3121 rbd_dev->lock_cookie[0] = '\0';
3122 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3123 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3124}
3125
3126static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3127 enum rbd_notify_op notify_op,
3128 struct page ***preply_pages,
3129 size_t *preply_len)
3130{
3131 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3132 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3133 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3134 char buf[buf_size];
3135 void *p = buf;
3136
3137 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3138
3139 /* encode *LockPayload NotifyMessage (op + ClientId) */
3140 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3141 ceph_encode_32(&p, notify_op);
3142 ceph_encode_64(&p, cid.gid);
3143 ceph_encode_64(&p, cid.handle);
3144
3145 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3146 &rbd_dev->header_oloc, buf, buf_size,
3147 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3148}
3149
3150static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3151 enum rbd_notify_op notify_op)
3152{
3153 struct page **reply_pages;
3154 size_t reply_len;
3155
3156 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3157 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3158}
3159
3160static void rbd_notify_acquired_lock(struct work_struct *work)
3161{
3162 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3163 acquired_lock_work);
3164
3165 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3166}
3167
3168static void rbd_notify_released_lock(struct work_struct *work)
3169{
3170 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3171 released_lock_work);
3172
3173 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3174}
3175
3176static int rbd_request_lock(struct rbd_device *rbd_dev)
3177{
3178 struct page **reply_pages;
3179 size_t reply_len;
3180 bool lock_owner_responded = false;
3181 int ret;
3182
3183 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3184
3185 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3186 &reply_pages, &reply_len);
3187 if (ret && ret != -ETIMEDOUT) {
3188 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3189 goto out;
3190 }
3191
3192 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3193 void *p = page_address(reply_pages[0]);
3194 void *const end = p + reply_len;
3195 u32 n;
3196
3197 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3198 while (n--) {
3199 u8 struct_v;
3200 u32 len;
3201
3202 ceph_decode_need(&p, end, 8 + 8, e_inval);
3203 p += 8 + 8; /* skip gid and cookie */
3204
3205 ceph_decode_32_safe(&p, end, len, e_inval);
3206 if (!len)
3207 continue;
3208
3209 if (lock_owner_responded) {
3210 rbd_warn(rbd_dev,
3211 "duplicate lock owners detected");
3212 ret = -EIO;
3213 goto out;
3214 }
3215
3216 lock_owner_responded = true;
3217 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3218 &struct_v, &len);
3219 if (ret) {
3220 rbd_warn(rbd_dev,
3221 "failed to decode ResponseMessage: %d",
3222 ret);
3223 goto e_inval;
3224 }
3225
3226 ret = ceph_decode_32(&p);
3227 }
3228 }
3229
3230 if (!lock_owner_responded) {
3231 rbd_warn(rbd_dev, "no lock owners detected");
3232 ret = -ETIMEDOUT;
3233 }
3234
3235out:
3236 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3237 return ret;
3238
3239e_inval:
3240 ret = -EINVAL;
3241 goto out;
3242}
3243
3244static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3245{
3246 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3247
3248 cancel_delayed_work(&rbd_dev->lock_dwork);
3249 if (wake_all)
3250 wake_up_all(&rbd_dev->lock_waitq);
3251 else
3252 wake_up(&rbd_dev->lock_waitq);
3253}
3254
3255static int get_lock_owner_info(struct rbd_device *rbd_dev,
3256 struct ceph_locker **lockers, u32 *num_lockers)
3257{
3258 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3259 u8 lock_type;
3260 char *lock_tag;
3261 int ret;
3262
3263 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3264
3265 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3266 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3267 &lock_type, &lock_tag, lockers, num_lockers);
3268 if (ret)
3269 return ret;
3270
3271 if (*num_lockers == 0) {
3272 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3273 goto out;
3274 }
3275
3276 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3277 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3278 lock_tag);
3279 ret = -EBUSY;
3280 goto out;
3281 }
3282
3283 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3284 rbd_warn(rbd_dev, "shared lock type detected");
3285 ret = -EBUSY;
3286 goto out;
3287 }
3288
3289 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3290 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3291 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3292 (*lockers)[0].id.cookie);
3293 ret = -EBUSY;
3294 goto out;
3295 }
3296
3297out:
3298 kfree(lock_tag);
3299 return ret;
3300}
3301
3302static int find_watcher(struct rbd_device *rbd_dev,
3303 const struct ceph_locker *locker)
3304{
3305 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3306 struct ceph_watch_item *watchers;
3307 u32 num_watchers;
3308 u64 cookie;
3309 int i;
3310 int ret;
3311
3312 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3313 &rbd_dev->header_oloc, &watchers,
3314 &num_watchers);
3315 if (ret)
3316 return ret;
3317
3318 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3319 for (i = 0; i < num_watchers; i++) {
3320 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3321 sizeof(locker->info.addr)) &&
3322 watchers[i].cookie == cookie) {
3323 struct rbd_client_id cid = {
3324 .gid = le64_to_cpu(watchers[i].name.num),
3325 .handle = cookie,
3326 };
3327
3328 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3329 rbd_dev, cid.gid, cid.handle);
3330 rbd_set_owner_cid(rbd_dev, &cid);
3331 ret = 1;
3332 goto out;
3333 }
3334 }
3335
3336 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3337 ret = 0;
3338out:
3339 kfree(watchers);
3340 return ret;
3341}
3342
3343/*
3344 * lock_rwsem must be held for write
3345 */
3346static int rbd_try_lock(struct rbd_device *rbd_dev)
3347{
3348 struct ceph_client *client = rbd_dev->rbd_client->client;
3349 struct ceph_locker *lockers;
3350 u32 num_lockers;
3351 int ret;
3352
3353 for (;;) {
3354 ret = rbd_lock(rbd_dev);
3355 if (ret != -EBUSY)
3356 return ret;
3357
3358 /* determine if the current lock holder is still alive */
3359 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3360 if (ret)
3361 return ret;
3362
3363 if (num_lockers == 0)
3364 goto again;
3365
3366 ret = find_watcher(rbd_dev, lockers);
3367 if (ret) {
3368 if (ret > 0)
3369 ret = 0; /* have to request lock */
3370 goto out;
3371 }
3372
3373 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3374 ENTITY_NAME(lockers[0].id.name));
3375
3376 ret = ceph_monc_blacklist_add(&client->monc,
3377 &lockers[0].info.addr);
3378 if (ret) {
3379 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3380 ENTITY_NAME(lockers[0].id.name), ret);
3381 goto out;
3382 }
3383
3384 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3385 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3386 lockers[0].id.cookie,
3387 &lockers[0].id.name);
3388 if (ret && ret != -ENOENT)
3389 goto out;
3390
3391again:
3392 ceph_free_lockers(lockers, num_lockers);
3393 }
3394
3395out:
3396 ceph_free_lockers(lockers, num_lockers);
3397 return ret;
3398}
3399
3400/*
3401 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3402 */
3403static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3404 int *pret)
3405{
3406 enum rbd_lock_state lock_state;
3407
3408 down_read(&rbd_dev->lock_rwsem);
3409 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3410 rbd_dev->lock_state);
3411 if (__rbd_is_lock_owner(rbd_dev)) {
3412 lock_state = rbd_dev->lock_state;
3413 up_read(&rbd_dev->lock_rwsem);
3414 return lock_state;
3415 }
3416
3417 up_read(&rbd_dev->lock_rwsem);
3418 down_write(&rbd_dev->lock_rwsem);
3419 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3420 rbd_dev->lock_state);
3421 if (!__rbd_is_lock_owner(rbd_dev)) {
3422 *pret = rbd_try_lock(rbd_dev);
3423 if (*pret)
3424 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3425 }
3426
3427 lock_state = rbd_dev->lock_state;
3428 up_write(&rbd_dev->lock_rwsem);
3429 return lock_state;
3430}
3431
3432static void rbd_acquire_lock(struct work_struct *work)
3433{
3434 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3435 struct rbd_device, lock_dwork);
3436 enum rbd_lock_state lock_state;
3437 int ret = 0;
3438
3439 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3440again:
3441 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3442 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3443 if (lock_state == RBD_LOCK_STATE_LOCKED)
3444 wake_requests(rbd_dev, true);
3445 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3446 rbd_dev, lock_state, ret);
3447 return;
3448 }
3449
3450 ret = rbd_request_lock(rbd_dev);
3451 if (ret == -ETIMEDOUT) {
3452 goto again; /* treat this as a dead client */
3453 } else if (ret == -EROFS) {
3454 rbd_warn(rbd_dev, "peer will not release lock");
3455 /*
3456 * If this is rbd_add_acquire_lock(), we want to fail
3457 * immediately -- reuse BLACKLISTED flag. Otherwise we
3458 * want to block.
3459 */
3460 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3461 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3462 /* wake "rbd map --exclusive" process */
3463 wake_requests(rbd_dev, false);
3464 }
3465 } else if (ret < 0) {
3466 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3467 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3468 RBD_RETRY_DELAY);
3469 } else {
3470 /*
3471 * lock owner acked, but resend if we don't see them
3472 * release the lock
3473 */
3474 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3475 rbd_dev);
3476 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3477 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3478 }
3479}
3480
3481/*
3482 * lock_rwsem must be held for write
3483 */
3484static bool rbd_release_lock(struct rbd_device *rbd_dev)
3485{
3486 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3487 rbd_dev->lock_state);
3488 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3489 return false;
3490
3491 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3492 downgrade_write(&rbd_dev->lock_rwsem);
3493 /*
3494 * Ensure that all in-flight IO is flushed.
3495 *
3496 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3497 * may be shared with other devices.
3498 */
3499 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3500 up_read(&rbd_dev->lock_rwsem);
3501
3502 down_write(&rbd_dev->lock_rwsem);
3503 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3504 rbd_dev->lock_state);
3505 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3506 return false;
3507
3508 rbd_unlock(rbd_dev);
3509 /*
3510 * Give others a chance to grab the lock - we would re-acquire
3511 * almost immediately if we got new IO during ceph_osdc_sync()
3512 * otherwise. We need to ack our own notifications, so this
3513 * lock_dwork will be requeued from rbd_wait_state_locked()
3514 * after wake_requests() in rbd_handle_released_lock().
3515 */
3516 cancel_delayed_work(&rbd_dev->lock_dwork);
3517 return true;
3518}
3519
3520static void rbd_release_lock_work(struct work_struct *work)
3521{
3522 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3523 unlock_work);
3524
3525 down_write(&rbd_dev->lock_rwsem);
3526 rbd_release_lock(rbd_dev);
3527 up_write(&rbd_dev->lock_rwsem);
3528}
3529
3530static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3531 void **p)
3532{
3533 struct rbd_client_id cid = { 0 };
3534
3535 if (struct_v >= 2) {
3536 cid.gid = ceph_decode_64(p);
3537 cid.handle = ceph_decode_64(p);
3538 }
3539
3540 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3541 cid.handle);
3542 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3543 down_write(&rbd_dev->lock_rwsem);
3544 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3545 /*
3546 * we already know that the remote client is
3547 * the owner
3548 */
3549 up_write(&rbd_dev->lock_rwsem);
3550 return;
3551 }
3552
3553 rbd_set_owner_cid(rbd_dev, &cid);
3554 downgrade_write(&rbd_dev->lock_rwsem);
3555 } else {
3556 down_read(&rbd_dev->lock_rwsem);
3557 }
3558
3559 if (!__rbd_is_lock_owner(rbd_dev))
3560 wake_requests(rbd_dev, false);
3561 up_read(&rbd_dev->lock_rwsem);
3562}
3563
3564static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3565 void **p)
3566{
3567 struct rbd_client_id cid = { 0 };
3568
3569 if (struct_v >= 2) {
3570 cid.gid = ceph_decode_64(p);
3571 cid.handle = ceph_decode_64(p);
3572 }
3573
3574 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3575 cid.handle);
3576 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3577 down_write(&rbd_dev->lock_rwsem);
3578 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3579 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3580 __func__, rbd_dev, cid.gid, cid.handle,
3581 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3582 up_write(&rbd_dev->lock_rwsem);
3583 return;
3584 }
3585
3586 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3587 downgrade_write(&rbd_dev->lock_rwsem);
3588 } else {
3589 down_read(&rbd_dev->lock_rwsem);
3590 }
3591
3592 if (!__rbd_is_lock_owner(rbd_dev))
3593 wake_requests(rbd_dev, false);
3594 up_read(&rbd_dev->lock_rwsem);
3595}
3596
3597/*
3598 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3599 * ResponseMessage is needed.
3600 */
3601static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3602 void **p)
3603{
3604 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3605 struct rbd_client_id cid = { 0 };
3606 int result = 1;
3607
3608 if (struct_v >= 2) {
3609 cid.gid = ceph_decode_64(p);
3610 cid.handle = ceph_decode_64(p);
3611 }
3612
3613 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3614 cid.handle);
3615 if (rbd_cid_equal(&cid, &my_cid))
3616 return result;
3617
3618 down_read(&rbd_dev->lock_rwsem);
3619 if (__rbd_is_lock_owner(rbd_dev)) {
3620 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3621 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3622 goto out_unlock;
3623
3624 /*
3625 * encode ResponseMessage(0) so the peer can detect
3626 * a missing owner
3627 */
3628 result = 0;
3629
3630 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3631 if (!rbd_dev->opts->exclusive) {
3632 dout("%s rbd_dev %p queueing unlock_work\n",
3633 __func__, rbd_dev);
3634 queue_work(rbd_dev->task_wq,
3635 &rbd_dev->unlock_work);
3636 } else {
3637 /* refuse to release the lock */
3638 result = -EROFS;
3639 }
3640 }
3641 }
3642
3643out_unlock:
3644 up_read(&rbd_dev->lock_rwsem);
3645 return result;
3646}
3647
3648static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3649 u64 notify_id, u64 cookie, s32 *result)
3650{
3651 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3652 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3653 char buf[buf_size];
3654 int ret;
3655
3656 if (result) {
3657 void *p = buf;
3658
3659 /* encode ResponseMessage */
3660 ceph_start_encoding(&p, 1, 1,
3661 buf_size - CEPH_ENCODING_START_BLK_LEN);
3662 ceph_encode_32(&p, *result);
3663 } else {
3664 buf_size = 0;
3665 }
3666
3667 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3668 &rbd_dev->header_oloc, notify_id, cookie,
3669 buf, buf_size);
3670 if (ret)
3671 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3672}
3673
3674static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3675 u64 cookie)
3676{
3677 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3678 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3679}
3680
3681static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3682 u64 notify_id, u64 cookie, s32 result)
3683{
3684 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3685 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3686}
3687
3688static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3689 u64 notifier_id, void *data, size_t data_len)
3690{
3691 struct rbd_device *rbd_dev = arg;
3692 void *p = data;
3693 void *const end = p + data_len;
3694 u8 struct_v = 0;
3695 u32 len;
3696 u32 notify_op;
3697 int ret;
3698
3699 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3700 __func__, rbd_dev, cookie, notify_id, data_len);
3701 if (data_len) {
3702 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3703 &struct_v, &len);
3704 if (ret) {
3705 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3706 ret);
3707 return;
3708 }
3709
3710 notify_op = ceph_decode_32(&p);
3711 } else {
3712 /* legacy notification for header updates */
3713 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3714 len = 0;
3715 }
3716
3717 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3718 switch (notify_op) {
3719 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3720 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3721 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3722 break;
3723 case RBD_NOTIFY_OP_RELEASED_LOCK:
3724 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3725 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3726 break;
3727 case RBD_NOTIFY_OP_REQUEST_LOCK:
3728 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3729 if (ret <= 0)
3730 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3731 cookie, ret);
3732 else
3733 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3734 break;
3735 case RBD_NOTIFY_OP_HEADER_UPDATE:
3736 ret = rbd_dev_refresh(rbd_dev);
3737 if (ret)
3738 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3739
3740 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3741 break;
3742 default:
3743 if (rbd_is_lock_owner(rbd_dev))
3744 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3745 cookie, -EOPNOTSUPP);
3746 else
3747 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3748 break;
3749 }
3750}
3751
3752static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3753
3754static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3755{
3756 struct rbd_device *rbd_dev = arg;
3757
3758 rbd_warn(rbd_dev, "encountered watch error: %d", err);
3759
3760 down_write(&rbd_dev->lock_rwsem);
3761 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3762 up_write(&rbd_dev->lock_rwsem);
3763
3764 mutex_lock(&rbd_dev->watch_mutex);
3765 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3766 __rbd_unregister_watch(rbd_dev);
3767 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3768
3769 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3770 }
3771 mutex_unlock(&rbd_dev->watch_mutex);
3772}
3773
3774/*
3775 * watch_mutex must be locked
3776 */
3777static int __rbd_register_watch(struct rbd_device *rbd_dev)
3778{
3779 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3780 struct ceph_osd_linger_request *handle;
3781
3782 rbd_assert(!rbd_dev->watch_handle);
3783 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3784
3785 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3786 &rbd_dev->header_oloc, rbd_watch_cb,
3787 rbd_watch_errcb, rbd_dev);
3788 if (IS_ERR(handle))
3789 return PTR_ERR(handle);
3790
3791 rbd_dev->watch_handle = handle;
3792 return 0;
3793}
3794
3795/*
3796 * watch_mutex must be locked
3797 */
3798static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3799{
3800 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3801 int ret;
3802
3803 rbd_assert(rbd_dev->watch_handle);
3804 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3805
3806 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3807 if (ret)
3808 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3809
3810 rbd_dev->watch_handle = NULL;
3811}
3812
3813static int rbd_register_watch(struct rbd_device *rbd_dev)
3814{
3815 int ret;
3816
3817 mutex_lock(&rbd_dev->watch_mutex);
3818 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3819 ret = __rbd_register_watch(rbd_dev);
3820 if (ret)
3821 goto out;
3822
3823 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3824 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3825
3826out:
3827 mutex_unlock(&rbd_dev->watch_mutex);
3828 return ret;
3829}
3830
3831static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3832{
3833 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3834
3835 cancel_work_sync(&rbd_dev->acquired_lock_work);
3836 cancel_work_sync(&rbd_dev->released_lock_work);
3837 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3838 cancel_work_sync(&rbd_dev->unlock_work);
3839}
3840
3841/*
3842 * header_rwsem must not be held to avoid a deadlock with
3843 * rbd_dev_refresh() when flushing notifies.
3844 */
3845static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3846{
3847 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3848 cancel_tasks_sync(rbd_dev);
3849
3850 mutex_lock(&rbd_dev->watch_mutex);
3851 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3852 __rbd_unregister_watch(rbd_dev);
3853 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3854 mutex_unlock(&rbd_dev->watch_mutex);
3855
3856 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3857 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3858}
3859
3860/*
3861 * lock_rwsem must be held for write
3862 */
3863static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3864{
3865 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3866 char cookie[32];
3867 int ret;
3868
3869 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3870
3871 format_lock_cookie(rbd_dev, cookie);
3872 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3873 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3874 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3875 RBD_LOCK_TAG, cookie);
3876 if (ret) {
3877 if (ret != -EOPNOTSUPP)
3878 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3879 ret);
3880
3881 /*
3882 * Lock cookie cannot be updated on older OSDs, so do
3883 * a manual release and queue an acquire.
3884 */
3885 if (rbd_release_lock(rbd_dev))
3886 queue_delayed_work(rbd_dev->task_wq,
3887 &rbd_dev->lock_dwork, 0);
3888 } else {
3889 __rbd_lock(rbd_dev, cookie);
3890 }
3891}
3892
3893static void rbd_reregister_watch(struct work_struct *work)
3894{
3895 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3896 struct rbd_device, watch_dwork);
3897 int ret;
3898
3899 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3900
3901 mutex_lock(&rbd_dev->watch_mutex);
3902 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3903 mutex_unlock(&rbd_dev->watch_mutex);
3904 return;
3905 }
3906
3907 ret = __rbd_register_watch(rbd_dev);
3908 if (ret) {
3909 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3910 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3911 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3912 wake_requests(rbd_dev, true);
3913 } else {
3914 queue_delayed_work(rbd_dev->task_wq,
3915 &rbd_dev->watch_dwork,
3916 RBD_RETRY_DELAY);
3917 }
3918 mutex_unlock(&rbd_dev->watch_mutex);
3919 return;
3920 }
3921
3922 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3923 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3924 mutex_unlock(&rbd_dev->watch_mutex);
3925
3926 down_write(&rbd_dev->lock_rwsem);
3927 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3928 rbd_reacquire_lock(rbd_dev);
3929 up_write(&rbd_dev->lock_rwsem);
3930
3931 ret = rbd_dev_refresh(rbd_dev);
3932 if (ret)
3933 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3934}
3935
3936/*
3937 * Synchronous osd object method call. Returns the number of bytes
3938 * returned in the outbound buffer, or a negative error code.
3939 */
3940static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3941 struct ceph_object_id *oid,
3942 struct ceph_object_locator *oloc,
3943 const char *method_name,
3944 const void *outbound,
3945 size_t outbound_size,
3946 void *inbound,
3947 size_t inbound_size)
3948{
3949 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3950 struct page *req_page = NULL;
3951 struct page *reply_page;
3952 int ret;
3953
3954 /*
3955 * Method calls are ultimately read operations. The result
3956 * should placed into the inbound buffer provided. They
3957 * also supply outbound data--parameters for the object
3958 * method. Currently if this is present it will be a
3959 * snapshot id.
3960 */
3961 if (outbound) {
3962 if (outbound_size > PAGE_SIZE)
3963 return -E2BIG;
3964
3965 req_page = alloc_page(GFP_KERNEL);
3966 if (!req_page)
3967 return -ENOMEM;
3968
3969 memcpy(page_address(req_page), outbound, outbound_size);
3970 }
3971
3972 reply_page = alloc_page(GFP_KERNEL);
3973 if (!reply_page) {
3974 if (req_page)
3975 __free_page(req_page);
3976 return -ENOMEM;
3977 }
3978
3979 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3980 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3981 reply_page, &inbound_size);
3982 if (!ret) {
3983 memcpy(inbound, page_address(reply_page), inbound_size);
3984 ret = inbound_size;
3985 }
3986
3987 if (req_page)
3988 __free_page(req_page);
3989 __free_page(reply_page);
3990 return ret;
3991}
3992
3993/*
3994 * lock_rwsem must be held for read
3995 */
3996static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3997{
3998 DEFINE_WAIT(wait);
3999
4000 do {
4001 /*
4002 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4003 * and cancel_delayed_work() in wake_requests().
4004 */
4005 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4006 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4007 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4008 TASK_UNINTERRUPTIBLE);
4009 up_read(&rbd_dev->lock_rwsem);
4010 schedule();
4011 down_read(&rbd_dev->lock_rwsem);
4012 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4013 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4014
4015 finish_wait(&rbd_dev->lock_waitq, &wait);
4016}
4017
4018static void rbd_queue_workfn(struct work_struct *work)
4019{
4020 struct request *rq = blk_mq_rq_from_pdu(work);
4021 struct rbd_device *rbd_dev = rq->q->queuedata;
4022 struct rbd_img_request *img_request;
4023 struct ceph_snap_context *snapc = NULL;
4024 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4025 u64 length = blk_rq_bytes(rq);
4026 enum obj_operation_type op_type;
4027 u64 mapping_size;
4028 bool must_be_locked;
4029 int result;
4030
4031 switch (req_op(rq)) {
4032 case REQ_OP_DISCARD:
4033 case REQ_OP_WRITE_ZEROES:
4034 op_type = OBJ_OP_DISCARD;
4035 break;
4036 case REQ_OP_WRITE:
4037 op_type = OBJ_OP_WRITE;
4038 break;
4039 case REQ_OP_READ:
4040 op_type = OBJ_OP_READ;
4041 break;
4042 default:
4043 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4044 result = -EIO;
4045 goto err;
4046 }
4047
4048 /* Ignore/skip any zero-length requests */
4049
4050 if (!length) {
4051 dout("%s: zero-length request\n", __func__);
4052 result = 0;
4053 goto err_rq;
4054 }
4055
4056 /* Only reads are allowed to a read-only device */
4057
4058 if (op_type != OBJ_OP_READ) {
4059 if (rbd_dev->mapping.read_only) {
4060 result = -EROFS;
4061 goto err_rq;
4062 }
4063 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4064 }
4065
4066 /*
4067 * Quit early if the mapped snapshot no longer exists. It's
4068 * still possible the snapshot will have disappeared by the
4069 * time our request arrives at the osd, but there's no sense in
4070 * sending it if we already know.
4071 */
4072 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4073 dout("request for non-existent snapshot");
4074 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4075 result = -ENXIO;
4076 goto err_rq;
4077 }
4078
4079 if (offset && length > U64_MAX - offset + 1) {
4080 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4081 length);
4082 result = -EINVAL;
4083 goto err_rq; /* Shouldn't happen */
4084 }
4085
4086 blk_mq_start_request(rq);
4087
4088 down_read(&rbd_dev->header_rwsem);
4089 mapping_size = rbd_dev->mapping.size;
4090 if (op_type != OBJ_OP_READ) {
4091 snapc = rbd_dev->header.snapc;
4092 ceph_get_snap_context(snapc);
4093 }
4094 up_read(&rbd_dev->header_rwsem);
4095
4096 if (offset + length > mapping_size) {
4097 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4098 length, mapping_size);
4099 result = -EIO;
4100 goto err_rq;
4101 }
4102
4103 must_be_locked =
4104 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4105 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4106 if (must_be_locked) {
4107 down_read(&rbd_dev->lock_rwsem);
4108 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4109 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4110 if (rbd_dev->opts->exclusive) {
4111 rbd_warn(rbd_dev, "exclusive lock required");
4112 result = -EROFS;
4113 goto err_unlock;
4114 }
4115 rbd_wait_state_locked(rbd_dev);
4116 }
4117 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4118 result = -EBLACKLISTED;
4119 goto err_unlock;
4120 }
4121 }
4122
4123 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4124 snapc);
4125 if (!img_request) {
4126 result = -ENOMEM;
4127 goto err_unlock;
4128 }
4129 img_request->rq = rq;
4130 snapc = NULL; /* img_request consumes a ref */
4131
4132 if (op_type == OBJ_OP_DISCARD)
4133 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4134 NULL);
4135 else
4136 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4137 rq->bio);
4138 if (result)
4139 goto err_img_request;
4140
4141 result = rbd_img_request_submit(img_request);
4142 if (result)
4143 goto err_img_request;
4144
4145 if (must_be_locked)
4146 up_read(&rbd_dev->lock_rwsem);
4147 return;
4148
4149err_img_request:
4150 rbd_img_request_put(img_request);
4151err_unlock:
4152 if (must_be_locked)
4153 up_read(&rbd_dev->lock_rwsem);
4154err_rq:
4155 if (result)
4156 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4157 obj_op_name(op_type), length, offset, result);
4158 ceph_put_snap_context(snapc);
4159err:
4160 blk_mq_end_request(rq, errno_to_blk_status(result));
4161}
4162
4163static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4164 const struct blk_mq_queue_data *bd)
4165{
4166 struct request *rq = bd->rq;
4167 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4168
4169 queue_work(rbd_wq, work);
4170 return BLK_STS_OK;
4171}
4172
4173static void rbd_free_disk(struct rbd_device *rbd_dev)
4174{
4175 blk_cleanup_queue(rbd_dev->disk->queue);
4176 blk_mq_free_tag_set(&rbd_dev->tag_set);
4177 put_disk(rbd_dev->disk);
4178 rbd_dev->disk = NULL;
4179}
4180
4181static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4182 struct ceph_object_id *oid,
4183 struct ceph_object_locator *oloc,
4184 void *buf, int buf_len)
4185
4186{
4187 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4188 struct ceph_osd_request *req;
4189 struct page **pages;
4190 int num_pages = calc_pages_for(0, buf_len);
4191 int ret;
4192
4193 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4194 if (!req)
4195 return -ENOMEM;
4196
4197 ceph_oid_copy(&req->r_base_oid, oid);
4198 ceph_oloc_copy(&req->r_base_oloc, oloc);
4199 req->r_flags = CEPH_OSD_FLAG_READ;
4200
4201 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4202 if (ret)
4203 goto out_req;
4204
4205 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4206 if (IS_ERR(pages)) {
4207 ret = PTR_ERR(pages);
4208 goto out_req;
4209 }
4210
4211 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4212 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4213 true);
4214
4215 ceph_osdc_start_request(osdc, req, false);
4216 ret = ceph_osdc_wait_request(osdc, req);
4217 if (ret >= 0)
4218 ceph_copy_from_page_vector(pages, buf, 0, ret);
4219
4220out_req:
4221 ceph_osdc_put_request(req);
4222 return ret;
4223}
4224
4225/*
4226 * Read the complete header for the given rbd device. On successful
4227 * return, the rbd_dev->header field will contain up-to-date
4228 * information about the image.
4229 */
4230static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4231{
4232 struct rbd_image_header_ondisk *ondisk = NULL;
4233 u32 snap_count = 0;
4234 u64 names_size = 0;
4235 u32 want_count;
4236 int ret;
4237
4238 /*
4239 * The complete header will include an array of its 64-bit
4240 * snapshot ids, followed by the names of those snapshots as
4241 * a contiguous block of NUL-terminated strings. Note that
4242 * the number of snapshots could change by the time we read
4243 * it in, in which case we re-read it.
4244 */
4245 do {
4246 size_t size;
4247
4248 kfree(ondisk);
4249
4250 size = sizeof (*ondisk);
4251 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4252 size += names_size;
4253 ondisk = kmalloc(size, GFP_KERNEL);
4254 if (!ondisk)
4255 return -ENOMEM;
4256
4257 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4258 &rbd_dev->header_oloc, ondisk, size);
4259 if (ret < 0)
4260 goto out;
4261 if ((size_t)ret < size) {
4262 ret = -ENXIO;
4263 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4264 size, ret);
4265 goto out;
4266 }
4267 if (!rbd_dev_ondisk_valid(ondisk)) {
4268 ret = -ENXIO;
4269 rbd_warn(rbd_dev, "invalid header");
4270 goto out;
4271 }
4272
4273 names_size = le64_to_cpu(ondisk->snap_names_len);
4274 want_count = snap_count;
4275 snap_count = le32_to_cpu(ondisk->snap_count);
4276 } while (snap_count != want_count);
4277
4278 ret = rbd_header_from_disk(rbd_dev, ondisk);
4279out:
4280 kfree(ondisk);
4281
4282 return ret;
4283}
4284
4285/*
4286 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4287 * has disappeared from the (just updated) snapshot context.
4288 */
4289static void rbd_exists_validate(struct rbd_device *rbd_dev)
4290{
4291 u64 snap_id;
4292
4293 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4294 return;
4295
4296 snap_id = rbd_dev->spec->snap_id;
4297 if (snap_id == CEPH_NOSNAP)
4298 return;
4299
4300 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4301 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4302}
4303
4304static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4305{
4306 sector_t size;
4307
4308 /*
4309 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4310 * try to update its size. If REMOVING is set, updating size
4311 * is just useless work since the device can't be opened.
4312 */
4313 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4314 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4315 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4316 dout("setting size to %llu sectors", (unsigned long long)size);
4317 set_capacity(rbd_dev->disk, size);
4318 revalidate_disk(rbd_dev->disk);
4319 }
4320}
4321
4322static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4323{
4324 u64 mapping_size;
4325 int ret;
4326
4327 down_write(&rbd_dev->header_rwsem);
4328 mapping_size = rbd_dev->mapping.size;
4329
4330 ret = rbd_dev_header_info(rbd_dev);
4331 if (ret)
4332 goto out;
4333
4334 /*
4335 * If there is a parent, see if it has disappeared due to the
4336 * mapped image getting flattened.
4337 */
4338 if (rbd_dev->parent) {
4339 ret = rbd_dev_v2_parent_info(rbd_dev);
4340 if (ret)
4341 goto out;
4342 }
4343
4344 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4345 rbd_dev->mapping.size = rbd_dev->header.image_size;
4346 } else {
4347 /* validate mapped snapshot's EXISTS flag */
4348 rbd_exists_validate(rbd_dev);
4349 }
4350
4351out:
4352 up_write(&rbd_dev->header_rwsem);
4353 if (!ret && mapping_size != rbd_dev->mapping.size)
4354 rbd_dev_update_size(rbd_dev);
4355
4356 return ret;
4357}
4358
4359static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4360 unsigned int hctx_idx, unsigned int numa_node)
4361{
4362 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4363
4364 INIT_WORK(work, rbd_queue_workfn);
4365 return 0;
4366}
4367
4368static const struct blk_mq_ops rbd_mq_ops = {
4369 .queue_rq = rbd_queue_rq,
4370 .init_request = rbd_init_request,
4371};
4372
4373static int rbd_init_disk(struct rbd_device *rbd_dev)
4374{
4375 struct gendisk *disk;
4376 struct request_queue *q;
4377 u64 segment_size;
4378 int err;
4379
4380 /* create gendisk info */
4381 disk = alloc_disk(single_major ?
4382 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4383 RBD_MINORS_PER_MAJOR);
4384 if (!disk)
4385 return -ENOMEM;
4386
4387 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4388 rbd_dev->dev_id);
4389 disk->major = rbd_dev->major;
4390 disk->first_minor = rbd_dev->minor;
4391 if (single_major)
4392 disk->flags |= GENHD_FL_EXT_DEVT;
4393 disk->fops = &rbd_bd_ops;
4394 disk->private_data = rbd_dev;
4395
4396 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4397 rbd_dev->tag_set.ops = &rbd_mq_ops;
4398 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4399 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4400 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4401 rbd_dev->tag_set.nr_hw_queues = 1;
4402 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4403
4404 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4405 if (err)
4406 goto out_disk;
4407
4408 q = blk_mq_init_queue(&rbd_dev->tag_set);
4409 if (IS_ERR(q)) {
4410 err = PTR_ERR(q);
4411 goto out_tag_set;
4412 }
4413
4414 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4415 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4416
4417 /* set io sizes to object size */
4418 segment_size = rbd_obj_bytes(&rbd_dev->header);
4419 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4420 q->limits.max_sectors = queue_max_hw_sectors(q);
4421 blk_queue_max_segments(q, USHRT_MAX);
4422 blk_queue_max_segment_size(q, segment_size);
4423 blk_queue_io_min(q, segment_size);
4424 blk_queue_io_opt(q, segment_size);
4425
4426 /* enable the discard support */
4427 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4428 q->limits.discard_granularity = segment_size;
4429 q->limits.discard_alignment = segment_size;
4430 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4431 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4432
4433 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4434 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4435
4436 /*
4437 * disk_release() expects a queue ref from add_disk() and will
4438 * put it. Hold an extra ref until add_disk() is called.
4439 */
4440 WARN_ON(!blk_get_queue(q));
4441 disk->queue = q;
4442 q->queuedata = rbd_dev;
4443
4444 rbd_dev->disk = disk;
4445
4446 return 0;
4447out_tag_set:
4448 blk_mq_free_tag_set(&rbd_dev->tag_set);
4449out_disk:
4450 put_disk(disk);
4451 return err;
4452}
4453
4454/*
4455 sysfs
4456*/
4457
4458static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4459{
4460 return container_of(dev, struct rbd_device, dev);
4461}
4462
4463static ssize_t rbd_size_show(struct device *dev,
4464 struct device_attribute *attr, char *buf)
4465{
4466 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4467
4468 return sprintf(buf, "%llu\n",
4469 (unsigned long long)rbd_dev->mapping.size);
4470}
4471
4472/*
4473 * Note this shows the features for whatever's mapped, which is not
4474 * necessarily the base image.
4475 */
4476static ssize_t rbd_features_show(struct device *dev,
4477 struct device_attribute *attr, char *buf)
4478{
4479 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4480
4481 return sprintf(buf, "0x%016llx\n",
4482 (unsigned long long)rbd_dev->mapping.features);
4483}
4484
4485static ssize_t rbd_major_show(struct device *dev,
4486 struct device_attribute *attr, char *buf)
4487{
4488 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4489
4490 if (rbd_dev->major)
4491 return sprintf(buf, "%d\n", rbd_dev->major);
4492
4493 return sprintf(buf, "(none)\n");
4494}
4495
4496static ssize_t rbd_minor_show(struct device *dev,
4497 struct device_attribute *attr, char *buf)
4498{
4499 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4500
4501 return sprintf(buf, "%d\n", rbd_dev->minor);
4502}
4503
4504static ssize_t rbd_client_addr_show(struct device *dev,
4505 struct device_attribute *attr, char *buf)
4506{
4507 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4508 struct ceph_entity_addr *client_addr =
4509 ceph_client_addr(rbd_dev->rbd_client->client);
4510
4511 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4512 le32_to_cpu(client_addr->nonce));
4513}
4514
4515static ssize_t rbd_client_id_show(struct device *dev,
4516 struct device_attribute *attr, char *buf)
4517{
4518 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4519
4520 return sprintf(buf, "client%lld\n",
4521 ceph_client_gid(rbd_dev->rbd_client->client));
4522}
4523
4524static ssize_t rbd_cluster_fsid_show(struct device *dev,
4525 struct device_attribute *attr, char *buf)
4526{
4527 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4528
4529 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4530}
4531
4532static ssize_t rbd_config_info_show(struct device *dev,
4533 struct device_attribute *attr, char *buf)
4534{
4535 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4536
4537 if (!capable(CAP_SYS_ADMIN))
4538 return -EPERM;
4539
4540 return sprintf(buf, "%s\n", rbd_dev->config_info);
4541}
4542
4543static ssize_t rbd_pool_show(struct device *dev,
4544 struct device_attribute *attr, char *buf)
4545{
4546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4547
4548 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4549}
4550
4551static ssize_t rbd_pool_id_show(struct device *dev,
4552 struct device_attribute *attr, char *buf)
4553{
4554 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4555
4556 return sprintf(buf, "%llu\n",
4557 (unsigned long long) rbd_dev->spec->pool_id);
4558}
4559
4560static ssize_t rbd_name_show(struct device *dev,
4561 struct device_attribute *attr, char *buf)
4562{
4563 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4564
4565 if (rbd_dev->spec->image_name)
4566 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4567
4568 return sprintf(buf, "(unknown)\n");
4569}
4570
4571static ssize_t rbd_image_id_show(struct device *dev,
4572 struct device_attribute *attr, char *buf)
4573{
4574 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4575
4576 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4577}
4578
4579/*
4580 * Shows the name of the currently-mapped snapshot (or
4581 * RBD_SNAP_HEAD_NAME for the base image).
4582 */
4583static ssize_t rbd_snap_show(struct device *dev,
4584 struct device_attribute *attr,
4585 char *buf)
4586{
4587 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4588
4589 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4590}
4591
4592static ssize_t rbd_snap_id_show(struct device *dev,
4593 struct device_attribute *attr, char *buf)
4594{
4595 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4596
4597 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4598}
4599
4600/*
4601 * For a v2 image, shows the chain of parent images, separated by empty
4602 * lines. For v1 images or if there is no parent, shows "(no parent
4603 * image)".
4604 */
4605static ssize_t rbd_parent_show(struct device *dev,
4606 struct device_attribute *attr,
4607 char *buf)
4608{
4609 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4610 ssize_t count = 0;
4611
4612 if (!rbd_dev->parent)
4613 return sprintf(buf, "(no parent image)\n");
4614
4615 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4616 struct rbd_spec *spec = rbd_dev->parent_spec;
4617
4618 count += sprintf(&buf[count], "%s"
4619 "pool_id %llu\npool_name %s\n"
4620 "image_id %s\nimage_name %s\n"
4621 "snap_id %llu\nsnap_name %s\n"
4622 "overlap %llu\n",
4623 !count ? "" : "\n", /* first? */
4624 spec->pool_id, spec->pool_name,
4625 spec->image_id, spec->image_name ?: "(unknown)",
4626 spec->snap_id, spec->snap_name,
4627 rbd_dev->parent_overlap);
4628 }
4629
4630 return count;
4631}
4632
4633static ssize_t rbd_image_refresh(struct device *dev,
4634 struct device_attribute *attr,
4635 const char *buf,
4636 size_t size)
4637{
4638 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4639 int ret;
4640
4641 if (!capable(CAP_SYS_ADMIN))
4642 return -EPERM;
4643
4644 ret = rbd_dev_refresh(rbd_dev);
4645 if (ret)
4646 return ret;
4647
4648 return size;
4649}
4650
4651static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4652static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4653static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4654static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4655static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4656static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4657static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4658static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4659static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4660static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4661static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4662static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4663static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4664static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4665static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4666static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4667
4668static struct attribute *rbd_attrs[] = {
4669 &dev_attr_size.attr,
4670 &dev_attr_features.attr,
4671 &dev_attr_major.attr,
4672 &dev_attr_minor.attr,
4673 &dev_attr_client_addr.attr,
4674 &dev_attr_client_id.attr,
4675 &dev_attr_cluster_fsid.attr,
4676 &dev_attr_config_info.attr,
4677 &dev_attr_pool.attr,
4678 &dev_attr_pool_id.attr,
4679 &dev_attr_name.attr,
4680 &dev_attr_image_id.attr,
4681 &dev_attr_current_snap.attr,
4682 &dev_attr_snap_id.attr,
4683 &dev_attr_parent.attr,
4684 &dev_attr_refresh.attr,
4685 NULL
4686};
4687
4688static struct attribute_group rbd_attr_group = {
4689 .attrs = rbd_attrs,
4690};
4691
4692static const struct attribute_group *rbd_attr_groups[] = {
4693 &rbd_attr_group,
4694 NULL
4695};
4696
4697static void rbd_dev_release(struct device *dev);
4698
4699static const struct device_type rbd_device_type = {
4700 .name = "rbd",
4701 .groups = rbd_attr_groups,
4702 .release = rbd_dev_release,
4703};
4704
4705static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4706{
4707 kref_get(&spec->kref);
4708
4709 return spec;
4710}
4711
4712static void rbd_spec_free(struct kref *kref);
4713static void rbd_spec_put(struct rbd_spec *spec)
4714{
4715 if (spec)
4716 kref_put(&spec->kref, rbd_spec_free);
4717}
4718
4719static struct rbd_spec *rbd_spec_alloc(void)
4720{
4721 struct rbd_spec *spec;
4722
4723 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4724 if (!spec)
4725 return NULL;
4726
4727 spec->pool_id = CEPH_NOPOOL;
4728 spec->snap_id = CEPH_NOSNAP;
4729 kref_init(&spec->kref);
4730
4731 return spec;
4732}
4733
4734static void rbd_spec_free(struct kref *kref)
4735{
4736 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4737
4738 kfree(spec->pool_name);
4739 kfree(spec->image_id);
4740 kfree(spec->image_name);
4741 kfree(spec->snap_name);
4742 kfree(spec);
4743}
4744
4745static void rbd_dev_free(struct rbd_device *rbd_dev)
4746{
4747 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4748 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4749
4750 ceph_oid_destroy(&rbd_dev->header_oid);
4751 ceph_oloc_destroy(&rbd_dev->header_oloc);
4752 kfree(rbd_dev->config_info);
4753
4754 rbd_put_client(rbd_dev->rbd_client);
4755 rbd_spec_put(rbd_dev->spec);
4756 kfree(rbd_dev->opts);
4757 kfree(rbd_dev);
4758}
4759
4760static void rbd_dev_release(struct device *dev)
4761{
4762 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4763 bool need_put = !!rbd_dev->opts;
4764
4765 if (need_put) {
4766 destroy_workqueue(rbd_dev->task_wq);
4767 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4768 }
4769
4770 rbd_dev_free(rbd_dev);
4771
4772 /*
4773 * This is racy, but way better than putting module outside of
4774 * the release callback. The race window is pretty small, so
4775 * doing something similar to dm (dm-builtin.c) is overkill.
4776 */
4777 if (need_put)
4778 module_put(THIS_MODULE);
4779}
4780
4781static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4782 struct rbd_spec *spec)
4783{
4784 struct rbd_device *rbd_dev;
4785
4786 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4787 if (!rbd_dev)
4788 return NULL;
4789
4790 spin_lock_init(&rbd_dev->lock);
4791 INIT_LIST_HEAD(&rbd_dev->node);
4792 init_rwsem(&rbd_dev->header_rwsem);
4793
4794 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4795 ceph_oid_init(&rbd_dev->header_oid);
4796 rbd_dev->header_oloc.pool = spec->pool_id;
4797
4798 mutex_init(&rbd_dev->watch_mutex);
4799 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4800 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4801
4802 init_rwsem(&rbd_dev->lock_rwsem);
4803 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4804 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4805 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4806 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4807 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4808 init_waitqueue_head(&rbd_dev->lock_waitq);
4809
4810 rbd_dev->dev.bus = &rbd_bus_type;
4811 rbd_dev->dev.type = &rbd_device_type;
4812 rbd_dev->dev.parent = &rbd_root_dev;
4813 device_initialize(&rbd_dev->dev);
4814
4815 rbd_dev->rbd_client = rbdc;
4816 rbd_dev->spec = spec;
4817
4818 return rbd_dev;
4819}
4820
4821/*
4822 * Create a mapping rbd_dev.
4823 */
4824static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4825 struct rbd_spec *spec,
4826 struct rbd_options *opts)
4827{
4828 struct rbd_device *rbd_dev;
4829
4830 rbd_dev = __rbd_dev_create(rbdc, spec);
4831 if (!rbd_dev)
4832 return NULL;
4833
4834 rbd_dev->opts = opts;
4835
4836 /* get an id and fill in device name */
4837 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4838 minor_to_rbd_dev_id(1 << MINORBITS),
4839 GFP_KERNEL);
4840 if (rbd_dev->dev_id < 0)
4841 goto fail_rbd_dev;
4842
4843 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4844 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4845 rbd_dev->name);
4846 if (!rbd_dev->task_wq)
4847 goto fail_dev_id;
4848
4849 /* we have a ref from do_rbd_add() */
4850 __module_get(THIS_MODULE);
4851
4852 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4853 return rbd_dev;
4854
4855fail_dev_id:
4856 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4857fail_rbd_dev:
4858 rbd_dev_free(rbd_dev);
4859 return NULL;
4860}
4861
4862static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4863{
4864 if (rbd_dev)
4865 put_device(&rbd_dev->dev);
4866}
4867
4868/*
4869 * Get the size and object order for an image snapshot, or if
4870 * snap_id is CEPH_NOSNAP, gets this information for the base
4871 * image.
4872 */
4873static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4874 u8 *order, u64 *snap_size)
4875{
4876 __le64 snapid = cpu_to_le64(snap_id);
4877 int ret;
4878 struct {
4879 u8 order;
4880 __le64 size;
4881 } __attribute__ ((packed)) size_buf = { 0 };
4882
4883 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4884 &rbd_dev->header_oloc, "get_size",
4885 &snapid, sizeof(snapid),
4886 &size_buf, sizeof(size_buf));
4887 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4888 if (ret < 0)
4889 return ret;
4890 if (ret < sizeof (size_buf))
4891 return -ERANGE;
4892
4893 if (order) {
4894 *order = size_buf.order;
4895 dout(" order %u", (unsigned int)*order);
4896 }
4897 *snap_size = le64_to_cpu(size_buf.size);
4898
4899 dout(" snap_id 0x%016llx snap_size = %llu\n",
4900 (unsigned long long)snap_id,
4901 (unsigned long long)*snap_size);
4902
4903 return 0;
4904}
4905
4906static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4907{
4908 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4909 &rbd_dev->header.obj_order,
4910 &rbd_dev->header.image_size);
4911}
4912
4913static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4914{
4915 void *reply_buf;
4916 int ret;
4917 void *p;
4918
4919 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4920 if (!reply_buf)
4921 return -ENOMEM;
4922
4923 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4924 &rbd_dev->header_oloc, "get_object_prefix",
4925 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4926 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4927 if (ret < 0)
4928 goto out;
4929
4930 p = reply_buf;
4931 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4932 p + ret, NULL, GFP_NOIO);
4933 ret = 0;
4934
4935 if (IS_ERR(rbd_dev->header.object_prefix)) {
4936 ret = PTR_ERR(rbd_dev->header.object_prefix);
4937 rbd_dev->header.object_prefix = NULL;
4938 } else {
4939 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4940 }
4941out:
4942 kfree(reply_buf);
4943
4944 return ret;
4945}
4946
4947static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4948 u64 *snap_features)
4949{
4950 __le64 snapid = cpu_to_le64(snap_id);
4951 struct {
4952 __le64 features;
4953 __le64 incompat;
4954 } __attribute__ ((packed)) features_buf = { 0 };
4955 u64 unsup;
4956 int ret;
4957
4958 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4959 &rbd_dev->header_oloc, "get_features",
4960 &snapid, sizeof(snapid),
4961 &features_buf, sizeof(features_buf));
4962 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4963 if (ret < 0)
4964 return ret;
4965 if (ret < sizeof (features_buf))
4966 return -ERANGE;
4967
4968 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4969 if (unsup) {
4970 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4971 unsup);
4972 return -ENXIO;
4973 }
4974
4975 *snap_features = le64_to_cpu(features_buf.features);
4976
4977 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4978 (unsigned long long)snap_id,
4979 (unsigned long long)*snap_features,
4980 (unsigned long long)le64_to_cpu(features_buf.incompat));
4981
4982 return 0;
4983}
4984
4985static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4986{
4987 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4988 &rbd_dev->header.features);
4989}
4990
4991static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4992{
4993 struct rbd_spec *parent_spec;
4994 size_t size;
4995 void *reply_buf = NULL;
4996 __le64 snapid;
4997 void *p;
4998 void *end;
4999 u64 pool_id;
5000 char *image_id;
5001 u64 snap_id;
5002 u64 overlap;
5003 int ret;
5004
5005 parent_spec = rbd_spec_alloc();
5006 if (!parent_spec)
5007 return -ENOMEM;
5008
5009 size = sizeof (__le64) + /* pool_id */
5010 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5011 sizeof (__le64) + /* snap_id */
5012 sizeof (__le64); /* overlap */
5013 reply_buf = kmalloc(size, GFP_KERNEL);
5014 if (!reply_buf) {
5015 ret = -ENOMEM;
5016 goto out_err;
5017 }
5018
5019 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5020 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5021 &rbd_dev->header_oloc, "get_parent",
5022 &snapid, sizeof(snapid), reply_buf, size);
5023 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5024 if (ret < 0)
5025 goto out_err;
5026
5027 p = reply_buf;
5028 end = reply_buf + ret;
5029 ret = -ERANGE;
5030 ceph_decode_64_safe(&p, end, pool_id, out_err);
5031 if (pool_id == CEPH_NOPOOL) {
5032 /*
5033 * Either the parent never existed, or we have
5034 * record of it but the image got flattened so it no
5035 * longer has a parent. When the parent of a
5036 * layered image disappears we immediately set the
5037 * overlap to 0. The effect of this is that all new
5038 * requests will be treated as if the image had no
5039 * parent.
5040 */
5041 if (rbd_dev->parent_overlap) {
5042 rbd_dev->parent_overlap = 0;
5043 rbd_dev_parent_put(rbd_dev);
5044 pr_info("%s: clone image has been flattened\n",
5045 rbd_dev->disk->disk_name);
5046 }
5047
5048 goto out; /* No parent? No problem. */
5049 }
5050
5051 /* The ceph file layout needs to fit pool id in 32 bits */
5052
5053 ret = -EIO;
5054 if (pool_id > (u64)U32_MAX) {
5055 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5056 (unsigned long long)pool_id, U32_MAX);
5057 goto out_err;
5058 }
5059
5060 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5061 if (IS_ERR(image_id)) {
5062 ret = PTR_ERR(image_id);
5063 goto out_err;
5064 }
5065 ceph_decode_64_safe(&p, end, snap_id, out_err);
5066 ceph_decode_64_safe(&p, end, overlap, out_err);
5067
5068 /*
5069 * The parent won't change (except when the clone is
5070 * flattened, already handled that). So we only need to
5071 * record the parent spec we have not already done so.
5072 */
5073 if (!rbd_dev->parent_spec) {
5074 parent_spec->pool_id = pool_id;
5075 parent_spec->image_id = image_id;
5076 parent_spec->snap_id = snap_id;
5077 rbd_dev->parent_spec = parent_spec;
5078 parent_spec = NULL; /* rbd_dev now owns this */
5079 } else {
5080 kfree(image_id);
5081 }
5082
5083 /*
5084 * We always update the parent overlap. If it's zero we issue
5085 * a warning, as we will proceed as if there was no parent.
5086 */
5087 if (!overlap) {
5088 if (parent_spec) {
5089 /* refresh, careful to warn just once */
5090 if (rbd_dev->parent_overlap)
5091 rbd_warn(rbd_dev,
5092 "clone now standalone (overlap became 0)");
5093 } else {
5094 /* initial probe */
5095 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5096 }
5097 }
5098 rbd_dev->parent_overlap = overlap;
5099
5100out:
5101 ret = 0;
5102out_err:
5103 kfree(reply_buf);
5104 rbd_spec_put(parent_spec);
5105
5106 return ret;
5107}
5108
5109static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5110{
5111 struct {
5112 __le64 stripe_unit;
5113 __le64 stripe_count;
5114 } __attribute__ ((packed)) striping_info_buf = { 0 };
5115 size_t size = sizeof (striping_info_buf);
5116 void *p;
5117 u64 obj_size;
5118 u64 stripe_unit;
5119 u64 stripe_count;
5120 int ret;
5121
5122 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5123 &rbd_dev->header_oloc, "get_stripe_unit_count",
5124 NULL, 0, &striping_info_buf, size);
5125 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5126 if (ret < 0)
5127 return ret;
5128 if (ret < size)
5129 return -ERANGE;
5130
5131 /*
5132 * We don't actually support the "fancy striping" feature
5133 * (STRIPINGV2) yet, but if the striping sizes are the
5134 * defaults the behavior is the same as before. So find
5135 * out, and only fail if the image has non-default values.
5136 */
5137 ret = -EINVAL;
5138 obj_size = rbd_obj_bytes(&rbd_dev->header);
5139 p = &striping_info_buf;
5140 stripe_unit = ceph_decode_64(&p);
5141 if (stripe_unit != obj_size) {
5142 rbd_warn(rbd_dev, "unsupported stripe unit "
5143 "(got %llu want %llu)",
5144 stripe_unit, obj_size);
5145 return -EINVAL;
5146 }
5147 stripe_count = ceph_decode_64(&p);
5148 if (stripe_count != 1) {
5149 rbd_warn(rbd_dev, "unsupported stripe count "
5150 "(got %llu want 1)", stripe_count);
5151 return -EINVAL;
5152 }
5153 rbd_dev->header.stripe_unit = stripe_unit;
5154 rbd_dev->header.stripe_count = stripe_count;
5155
5156 return 0;
5157}
5158
5159static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5160{
5161 __le64 data_pool_id;
5162 int ret;
5163
5164 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5165 &rbd_dev->header_oloc, "get_data_pool",
5166 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5167 if (ret < 0)
5168 return ret;
5169 if (ret < sizeof(data_pool_id))
5170 return -EBADMSG;
5171
5172 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5173 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5174 return 0;
5175}
5176
5177static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5178{
5179 CEPH_DEFINE_OID_ONSTACK(oid);
5180 size_t image_id_size;
5181 char *image_id;
5182 void *p;
5183 void *end;
5184 size_t size;
5185 void *reply_buf = NULL;
5186 size_t len = 0;
5187 char *image_name = NULL;
5188 int ret;
5189
5190 rbd_assert(!rbd_dev->spec->image_name);
5191
5192 len = strlen(rbd_dev->spec->image_id);
5193 image_id_size = sizeof (__le32) + len;
5194 image_id = kmalloc(image_id_size, GFP_KERNEL);
5195 if (!image_id)
5196 return NULL;
5197
5198 p = image_id;
5199 end = image_id + image_id_size;
5200 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5201
5202 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5203 reply_buf = kmalloc(size, GFP_KERNEL);
5204 if (!reply_buf)
5205 goto out;
5206
5207 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5208 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5209 "dir_get_name", image_id, image_id_size,
5210 reply_buf, size);
5211 if (ret < 0)
5212 goto out;
5213 p = reply_buf;
5214 end = reply_buf + ret;
5215
5216 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5217 if (IS_ERR(image_name))
5218 image_name = NULL;
5219 else
5220 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5221out:
5222 kfree(reply_buf);
5223 kfree(image_id);
5224
5225 return image_name;
5226}
5227
5228static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5229{
5230 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5231 const char *snap_name;
5232 u32 which = 0;
5233
5234 /* Skip over names until we find the one we are looking for */
5235
5236 snap_name = rbd_dev->header.snap_names;
5237 while (which < snapc->num_snaps) {
5238 if (!strcmp(name, snap_name))
5239 return snapc->snaps[which];
5240 snap_name += strlen(snap_name) + 1;
5241 which++;
5242 }
5243 return CEPH_NOSNAP;
5244}
5245
5246static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5247{
5248 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5249 u32 which;
5250 bool found = false;
5251 u64 snap_id;
5252
5253 for (which = 0; !found && which < snapc->num_snaps; which++) {
5254 const char *snap_name;
5255
5256 snap_id = snapc->snaps[which];
5257 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5258 if (IS_ERR(snap_name)) {
5259 /* ignore no-longer existing snapshots */
5260 if (PTR_ERR(snap_name) == -ENOENT)
5261 continue;
5262 else
5263 break;
5264 }
5265 found = !strcmp(name, snap_name);
5266 kfree(snap_name);
5267 }
5268 return found ? snap_id : CEPH_NOSNAP;
5269}
5270
5271/*
5272 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5273 * no snapshot by that name is found, or if an error occurs.
5274 */
5275static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5276{
5277 if (rbd_dev->image_format == 1)
5278 return rbd_v1_snap_id_by_name(rbd_dev, name);
5279
5280 return rbd_v2_snap_id_by_name(rbd_dev, name);
5281}
5282
5283/*
5284 * An image being mapped will have everything but the snap id.
5285 */
5286static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5287{
5288 struct rbd_spec *spec = rbd_dev->spec;
5289
5290 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5291 rbd_assert(spec->image_id && spec->image_name);
5292 rbd_assert(spec->snap_name);
5293
5294 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5295 u64 snap_id;
5296
5297 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5298 if (snap_id == CEPH_NOSNAP)
5299 return -ENOENT;
5300
5301 spec->snap_id = snap_id;
5302 } else {
5303 spec->snap_id = CEPH_NOSNAP;
5304 }
5305
5306 return 0;
5307}
5308
5309/*
5310 * A parent image will have all ids but none of the names.
5311 *
5312 * All names in an rbd spec are dynamically allocated. It's OK if we
5313 * can't figure out the name for an image id.
5314 */
5315static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5316{
5317 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5318 struct rbd_spec *spec = rbd_dev->spec;
5319 const char *pool_name;
5320 const char *image_name;
5321 const char *snap_name;
5322 int ret;
5323
5324 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5325 rbd_assert(spec->image_id);
5326 rbd_assert(spec->snap_id != CEPH_NOSNAP);
5327
5328 /* Get the pool name; we have to make our own copy of this */
5329
5330 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5331 if (!pool_name) {
5332 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5333 return -EIO;
5334 }
5335 pool_name = kstrdup(pool_name, GFP_KERNEL);
5336 if (!pool_name)
5337 return -ENOMEM;
5338
5339 /* Fetch the image name; tolerate failure here */
5340
5341 image_name = rbd_dev_image_name(rbd_dev);
5342 if (!image_name)
5343 rbd_warn(rbd_dev, "unable to get image name");
5344
5345 /* Fetch the snapshot name */
5346
5347 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5348 if (IS_ERR(snap_name)) {
5349 ret = PTR_ERR(snap_name);
5350 goto out_err;
5351 }
5352
5353 spec->pool_name = pool_name;
5354 spec->image_name = image_name;
5355 spec->snap_name = snap_name;
5356
5357 return 0;
5358
5359out_err:
5360 kfree(image_name);
5361 kfree(pool_name);
5362 return ret;
5363}
5364
5365static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5366{
5367 size_t size;
5368 int ret;
5369 void *reply_buf;
5370 void *p;
5371 void *end;
5372 u64 seq;
5373 u32 snap_count;
5374 struct ceph_snap_context *snapc;
5375 u32 i;
5376
5377 /*
5378 * We'll need room for the seq value (maximum snapshot id),
5379 * snapshot count, and array of that many snapshot ids.
5380 * For now we have a fixed upper limit on the number we're
5381 * prepared to receive.
5382 */
5383 size = sizeof (__le64) + sizeof (__le32) +
5384 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5385 reply_buf = kzalloc(size, GFP_KERNEL);
5386 if (!reply_buf)
5387 return -ENOMEM;
5388
5389 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5390 &rbd_dev->header_oloc, "get_snapcontext",
5391 NULL, 0, reply_buf, size);
5392 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5393 if (ret < 0)
5394 goto out;
5395
5396 p = reply_buf;
5397 end = reply_buf + ret;
5398 ret = -ERANGE;
5399 ceph_decode_64_safe(&p, end, seq, out);
5400 ceph_decode_32_safe(&p, end, snap_count, out);
5401
5402 /*
5403 * Make sure the reported number of snapshot ids wouldn't go
5404 * beyond the end of our buffer. But before checking that,
5405 * make sure the computed size of the snapshot context we
5406 * allocate is representable in a size_t.
5407 */
5408 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5409 / sizeof (u64)) {
5410 ret = -EINVAL;
5411 goto out;
5412 }
5413 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5414 goto out;
5415 ret = 0;
5416
5417 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5418 if (!snapc) {
5419 ret = -ENOMEM;
5420 goto out;
5421 }
5422 snapc->seq = seq;
5423 for (i = 0; i < snap_count; i++)
5424 snapc->snaps[i] = ceph_decode_64(&p);
5425
5426 ceph_put_snap_context(rbd_dev->header.snapc);
5427 rbd_dev->header.snapc = snapc;
5428
5429 dout(" snap context seq = %llu, snap_count = %u\n",
5430 (unsigned long long)seq, (unsigned int)snap_count);
5431out:
5432 kfree(reply_buf);
5433
5434 return ret;
5435}
5436
5437static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5438 u64 snap_id)
5439{
5440 size_t size;
5441 void *reply_buf;
5442 __le64 snapid;
5443 int ret;
5444 void *p;
5445 void *end;
5446 char *snap_name;
5447
5448 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5449 reply_buf = kmalloc(size, GFP_KERNEL);
5450 if (!reply_buf)
5451 return ERR_PTR(-ENOMEM);
5452
5453 snapid = cpu_to_le64(snap_id);
5454 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5455 &rbd_dev->header_oloc, "get_snapshot_name",
5456 &snapid, sizeof(snapid), reply_buf, size);
5457 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5458 if (ret < 0) {
5459 snap_name = ERR_PTR(ret);
5460 goto out;
5461 }
5462
5463 p = reply_buf;
5464 end = reply_buf + ret;
5465 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5466 if (IS_ERR(snap_name))
5467 goto out;
5468
5469 dout(" snap_id 0x%016llx snap_name = %s\n",
5470 (unsigned long long)snap_id, snap_name);
5471out:
5472 kfree(reply_buf);
5473
5474 return snap_name;
5475}
5476
5477static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5478{
5479 bool first_time = rbd_dev->header.object_prefix == NULL;
5480 int ret;
5481
5482 ret = rbd_dev_v2_image_size(rbd_dev);
5483 if (ret)
5484 return ret;
5485
5486 if (first_time) {
5487 ret = rbd_dev_v2_header_onetime(rbd_dev);
5488 if (ret)
5489 return ret;
5490 }
5491
5492 ret = rbd_dev_v2_snap_context(rbd_dev);
5493 if (ret && first_time) {
5494 kfree(rbd_dev->header.object_prefix);
5495 rbd_dev->header.object_prefix = NULL;
5496 }
5497
5498 return ret;
5499}
5500
5501static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5502{
5503 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5504
5505 if (rbd_dev->image_format == 1)
5506 return rbd_dev_v1_header_info(rbd_dev);
5507
5508 return rbd_dev_v2_header_info(rbd_dev);
5509}
5510
5511/*
5512 * Skips over white space at *buf, and updates *buf to point to the
5513 * first found non-space character (if any). Returns the length of
5514 * the token (string of non-white space characters) found. Note
5515 * that *buf must be terminated with '\0'.
5516 */
5517static inline size_t next_token(const char **buf)
5518{
5519 /*
5520 * These are the characters that produce nonzero for
5521 * isspace() in the "C" and "POSIX" locales.
5522 */
5523 const char *spaces = " \f\n\r\t\v";
5524
5525 *buf += strspn(*buf, spaces); /* Find start of token */
5526
5527 return strcspn(*buf, spaces); /* Return token length */
5528}
5529
5530/*
5531 * Finds the next token in *buf, dynamically allocates a buffer big
5532 * enough to hold a copy of it, and copies the token into the new
5533 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5534 * that a duplicate buffer is created even for a zero-length token.
5535 *
5536 * Returns a pointer to the newly-allocated duplicate, or a null
5537 * pointer if memory for the duplicate was not available. If
5538 * the lenp argument is a non-null pointer, the length of the token
5539 * (not including the '\0') is returned in *lenp.
5540 *
5541 * If successful, the *buf pointer will be updated to point beyond
5542 * the end of the found token.
5543 *
5544 * Note: uses GFP_KERNEL for allocation.
5545 */
5546static inline char *dup_token(const char **buf, size_t *lenp)
5547{
5548 char *dup;
5549 size_t len;
5550
5551 len = next_token(buf);
5552 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5553 if (!dup)
5554 return NULL;
5555 *(dup + len) = '\0';
5556 *buf += len;
5557
5558 if (lenp)
5559 *lenp = len;
5560
5561 return dup;
5562}
5563
5564/*
5565 * Parse the options provided for an "rbd add" (i.e., rbd image
5566 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5567 * and the data written is passed here via a NUL-terminated buffer.
5568 * Returns 0 if successful or an error code otherwise.
5569 *
5570 * The information extracted from these options is recorded in
5571 * the other parameters which return dynamically-allocated
5572 * structures:
5573 * ceph_opts
5574 * The address of a pointer that will refer to a ceph options
5575 * structure. Caller must release the returned pointer using
5576 * ceph_destroy_options() when it is no longer needed.
5577 * rbd_opts
5578 * Address of an rbd options pointer. Fully initialized by
5579 * this function; caller must release with kfree().
5580 * spec
5581 * Address of an rbd image specification pointer. Fully
5582 * initialized by this function based on parsed options.
5583 * Caller must release with rbd_spec_put().
5584 *
5585 * The options passed take this form:
5586 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5587 * where:
5588 * <mon_addrs>
5589 * A comma-separated list of one or more monitor addresses.
5590 * A monitor address is an ip address, optionally followed
5591 * by a port number (separated by a colon).
5592 * I.e.: ip1[:port1][,ip2[:port2]...]
5593 * <options>
5594 * A comma-separated list of ceph and/or rbd options.
5595 * <pool_name>
5596 * The name of the rados pool containing the rbd image.
5597 * <image_name>
5598 * The name of the image in that pool to map.
5599 * <snap_id>
5600 * An optional snapshot id. If provided, the mapping will
5601 * present data from the image at the time that snapshot was
5602 * created. The image head is used if no snapshot id is
5603 * provided. Snapshot mappings are always read-only.
5604 */
5605static int rbd_add_parse_args(const char *buf,
5606 struct ceph_options **ceph_opts,
5607 struct rbd_options **opts,
5608 struct rbd_spec **rbd_spec)
5609{
5610 size_t len;
5611 char *options;
5612 const char *mon_addrs;
5613 char *snap_name;
5614 size_t mon_addrs_size;
5615 struct rbd_spec *spec = NULL;
5616 struct rbd_options *rbd_opts = NULL;
5617 struct ceph_options *copts;
5618 int ret;
5619
5620 /* The first four tokens are required */
5621
5622 len = next_token(&buf);
5623 if (!len) {
5624 rbd_warn(NULL, "no monitor address(es) provided");
5625 return -EINVAL;
5626 }
5627 mon_addrs = buf;
5628 mon_addrs_size = len + 1;
5629 buf += len;
5630
5631 ret = -EINVAL;
5632 options = dup_token(&buf, NULL);
5633 if (!options)
5634 return -ENOMEM;
5635 if (!*options) {
5636 rbd_warn(NULL, "no options provided");
5637 goto out_err;
5638 }
5639
5640 spec = rbd_spec_alloc();
5641 if (!spec)
5642 goto out_mem;
5643
5644 spec->pool_name = dup_token(&buf, NULL);
5645 if (!spec->pool_name)
5646 goto out_mem;
5647 if (!*spec->pool_name) {
5648 rbd_warn(NULL, "no pool name provided");
5649 goto out_err;
5650 }
5651
5652 spec->image_name = dup_token(&buf, NULL);
5653 if (!spec->image_name)
5654 goto out_mem;
5655 if (!*spec->image_name) {
5656 rbd_warn(NULL, "no image name provided");
5657 goto out_err;
5658 }
5659
5660 /*
5661 * Snapshot name is optional; default is to use "-"
5662 * (indicating the head/no snapshot).
5663 */
5664 len = next_token(&buf);
5665 if (!len) {
5666 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5667 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5668 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5669 ret = -ENAMETOOLONG;
5670 goto out_err;
5671 }
5672 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5673 if (!snap_name)
5674 goto out_mem;
5675 *(snap_name + len) = '\0';
5676 spec->snap_name = snap_name;
5677
5678 /* Initialize all rbd options to the defaults */
5679
5680 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5681 if (!rbd_opts)
5682 goto out_mem;
5683
5684 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5685 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5686 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5687 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5688
5689 copts = ceph_parse_options(options, mon_addrs,
5690 mon_addrs + mon_addrs_size - 1,
5691 parse_rbd_opts_token, rbd_opts);
5692 if (IS_ERR(copts)) {
5693 ret = PTR_ERR(copts);
5694 goto out_err;
5695 }
5696 kfree(options);
5697
5698 *ceph_opts = copts;
5699 *opts = rbd_opts;
5700 *rbd_spec = spec;
5701
5702 return 0;
5703out_mem:
5704 ret = -ENOMEM;
5705out_err:
5706 kfree(rbd_opts);
5707 rbd_spec_put(spec);
5708 kfree(options);
5709
5710 return ret;
5711}
5712
5713/*
5714 * Return pool id (>= 0) or a negative error code.
5715 */
5716static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5717{
5718 struct ceph_options *opts = rbdc->client->options;
5719 u64 newest_epoch;
5720 int tries = 0;
5721 int ret;
5722
5723again:
5724 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5725 if (ret == -ENOENT && tries++ < 1) {
5726 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5727 &newest_epoch);
5728 if (ret < 0)
5729 return ret;
5730
5731 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5732 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5733 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5734 newest_epoch,
5735 opts->mount_timeout);
5736 goto again;
5737 } else {
5738 /* the osdmap we have is new enough */
5739 return -ENOENT;
5740 }
5741 }
5742
5743 return ret;
5744}
5745
5746static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5747{
5748 down_write(&rbd_dev->lock_rwsem);
5749 if (__rbd_is_lock_owner(rbd_dev))
5750 rbd_unlock(rbd_dev);
5751 up_write(&rbd_dev->lock_rwsem);
5752}
5753
5754static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5755{
5756 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5757 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5758 return -EINVAL;
5759 }
5760
5761 /* FIXME: "rbd map --exclusive" should be in interruptible */
5762 down_read(&rbd_dev->lock_rwsem);
5763 rbd_wait_state_locked(rbd_dev);
5764 up_read(&rbd_dev->lock_rwsem);
5765 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5766 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5767 return -EROFS;
5768 }
5769
5770 return 0;
5771}
5772
5773/*
5774 * An rbd format 2 image has a unique identifier, distinct from the
5775 * name given to it by the user. Internally, that identifier is
5776 * what's used to specify the names of objects related to the image.
5777 *
5778 * A special "rbd id" object is used to map an rbd image name to its
5779 * id. If that object doesn't exist, then there is no v2 rbd image
5780 * with the supplied name.
5781 *
5782 * This function will record the given rbd_dev's image_id field if
5783 * it can be determined, and in that case will return 0. If any
5784 * errors occur a negative errno will be returned and the rbd_dev's
5785 * image_id field will be unchanged (and should be NULL).
5786 */
5787static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5788{
5789 int ret;
5790 size_t size;
5791 CEPH_DEFINE_OID_ONSTACK(oid);
5792 void *response;
5793 char *image_id;
5794
5795 /*
5796 * When probing a parent image, the image id is already
5797 * known (and the image name likely is not). There's no
5798 * need to fetch the image id again in this case. We
5799 * do still need to set the image format though.
5800 */
5801 if (rbd_dev->spec->image_id) {
5802 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5803
5804 return 0;
5805 }
5806
5807 /*
5808 * First, see if the format 2 image id file exists, and if
5809 * so, get the image's persistent id from it.
5810 */
5811 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5812 rbd_dev->spec->image_name);
5813 if (ret)
5814 return ret;
5815
5816 dout("rbd id object name is %s\n", oid.name);
5817
5818 /* Response will be an encoded string, which includes a length */
5819
5820 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5821 response = kzalloc(size, GFP_NOIO);
5822 if (!response) {
5823 ret = -ENOMEM;
5824 goto out;
5825 }
5826
5827 /* If it doesn't exist we'll assume it's a format 1 image */
5828
5829 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5830 "get_id", NULL, 0,
5831 response, RBD_IMAGE_ID_LEN_MAX);
5832 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5833 if (ret == -ENOENT) {
5834 image_id = kstrdup("", GFP_KERNEL);
5835 ret = image_id ? 0 : -ENOMEM;
5836 if (!ret)
5837 rbd_dev->image_format = 1;
5838 } else if (ret >= 0) {
5839 void *p = response;
5840
5841 image_id = ceph_extract_encoded_string(&p, p + ret,
5842 NULL, GFP_NOIO);
5843 ret = PTR_ERR_OR_ZERO(image_id);
5844 if (!ret)
5845 rbd_dev->image_format = 2;
5846 }
5847
5848 if (!ret) {
5849 rbd_dev->spec->image_id = image_id;
5850 dout("image_id is %s\n", image_id);
5851 }
5852out:
5853 kfree(response);
5854 ceph_oid_destroy(&oid);
5855 return ret;
5856}
5857
5858/*
5859 * Undo whatever state changes are made by v1 or v2 header info
5860 * call.
5861 */
5862static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5863{
5864 struct rbd_image_header *header;
5865
5866 rbd_dev_parent_put(rbd_dev);
5867
5868 /* Free dynamic fields from the header, then zero it out */
5869
5870 header = &rbd_dev->header;
5871 ceph_put_snap_context(header->snapc);
5872 kfree(header->snap_sizes);
5873 kfree(header->snap_names);
5874 kfree(header->object_prefix);
5875 memset(header, 0, sizeof (*header));
5876}
5877
5878static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5879{
5880 int ret;
5881
5882 ret = rbd_dev_v2_object_prefix(rbd_dev);
5883 if (ret)
5884 goto out_err;
5885
5886 /*
5887 * Get the and check features for the image. Currently the
5888 * features are assumed to never change.
5889 */
5890 ret = rbd_dev_v2_features(rbd_dev);
5891 if (ret)
5892 goto out_err;
5893
5894 /* If the image supports fancy striping, get its parameters */
5895
5896 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5897 ret = rbd_dev_v2_striping_info(rbd_dev);
5898 if (ret < 0)
5899 goto out_err;
5900 }
5901
5902 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5903 ret = rbd_dev_v2_data_pool(rbd_dev);
5904 if (ret)
5905 goto out_err;
5906 }
5907
5908 rbd_init_layout(rbd_dev);
5909 return 0;
5910
5911out_err:
5912 rbd_dev->header.features = 0;
5913 kfree(rbd_dev->header.object_prefix);
5914 rbd_dev->header.object_prefix = NULL;
5915 return ret;
5916}
5917
5918/*
5919 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5920 * rbd_dev_image_probe() recursion depth, which means it's also the
5921 * length of the already discovered part of the parent chain.
5922 */
5923static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5924{
5925 struct rbd_device *parent = NULL;
5926 int ret;
5927
5928 if (!rbd_dev->parent_spec)
5929 return 0;
5930
5931 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5932 pr_info("parent chain is too long (%d)\n", depth);
5933 ret = -EINVAL;
5934 goto out_err;
5935 }
5936
5937 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5938 if (!parent) {
5939 ret = -ENOMEM;
5940 goto out_err;
5941 }
5942
5943 /*
5944 * Images related by parent/child relationships always share
5945 * rbd_client and spec/parent_spec, so bump their refcounts.
5946 */
5947 __rbd_get_client(rbd_dev->rbd_client);
5948 rbd_spec_get(rbd_dev->parent_spec);
5949
5950 ret = rbd_dev_image_probe(parent, depth);
5951 if (ret < 0)
5952 goto out_err;
5953
5954 rbd_dev->parent = parent;
5955 atomic_set(&rbd_dev->parent_ref, 1);
5956 return 0;
5957
5958out_err:
5959 rbd_dev_unparent(rbd_dev);
5960 rbd_dev_destroy(parent);
5961 return ret;
5962}
5963
5964static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5965{
5966 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5967 rbd_dev_mapping_clear(rbd_dev);
5968 rbd_free_disk(rbd_dev);
5969 if (!single_major)
5970 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5971}
5972
5973/*
5974 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5975 * upon return.
5976 */
5977static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5978{
5979 int ret;
5980
5981 /* Record our major and minor device numbers. */
5982
5983 if (!single_major) {
5984 ret = register_blkdev(0, rbd_dev->name);
5985 if (ret < 0)
5986 goto err_out_unlock;
5987
5988 rbd_dev->major = ret;
5989 rbd_dev->minor = 0;
5990 } else {
5991 rbd_dev->major = rbd_major;
5992 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5993 }
5994
5995 /* Set up the blkdev mapping. */
5996
5997 ret = rbd_init_disk(rbd_dev);
5998 if (ret)
5999 goto err_out_blkdev;
6000
6001 ret = rbd_dev_mapping_set(rbd_dev);
6002 if (ret)
6003 goto err_out_disk;
6004
6005 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6006 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
6007
6008 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6009 if (ret)
6010 goto err_out_mapping;
6011
6012 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6013 up_write(&rbd_dev->header_rwsem);
6014 return 0;
6015
6016err_out_mapping:
6017 rbd_dev_mapping_clear(rbd_dev);
6018err_out_disk:
6019 rbd_free_disk(rbd_dev);
6020err_out_blkdev:
6021 if (!single_major)
6022 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6023err_out_unlock:
6024 up_write(&rbd_dev->header_rwsem);
6025 return ret;
6026}
6027
6028static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6029{
6030 struct rbd_spec *spec = rbd_dev->spec;
6031 int ret;
6032
6033 /* Record the header object name for this rbd image. */
6034
6035 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6036 if (rbd_dev->image_format == 1)
6037 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6038 spec->image_name, RBD_SUFFIX);
6039 else
6040 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6041 RBD_HEADER_PREFIX, spec->image_id);
6042
6043 return ret;
6044}
6045
6046static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6047{
6048 if (rbd_dev->opts)
6049 rbd_unregister_watch(rbd_dev);
6050
6051 rbd_dev_unprobe(rbd_dev);
6052 rbd_dev->image_format = 0;
6053 kfree(rbd_dev->spec->image_id);
6054 rbd_dev->spec->image_id = NULL;
6055}
6056
6057/*
6058 * Probe for the existence of the header object for the given rbd
6059 * device. If this image is the one being mapped (i.e., not a
6060 * parent), initiate a watch on its header object before using that
6061 * object to get detailed information about the rbd image.
6062 *
6063 * On success, returns with header_rwsem held for write if called
6064 * with @depth == 0.
6065 */
6066static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6067{
6068 int ret;
6069
6070 /*
6071 * Get the id from the image id object. Unless there's an
6072 * error, rbd_dev->spec->image_id will be filled in with
6073 * a dynamically-allocated string, and rbd_dev->image_format
6074 * will be set to either 1 or 2.
6075 */
6076 ret = rbd_dev_image_id(rbd_dev);
6077 if (ret)
6078 return ret;
6079
6080 ret = rbd_dev_header_name(rbd_dev);
6081 if (ret)
6082 goto err_out_format;
6083
6084 if (!depth) {
6085 ret = rbd_register_watch(rbd_dev);
6086 if (ret) {
6087 if (ret == -ENOENT)
6088 pr_info("image %s/%s does not exist\n",
6089 rbd_dev->spec->pool_name,
6090 rbd_dev->spec->image_name);
6091 goto err_out_format;
6092 }
6093 }
6094
6095 if (!depth)
6096 down_write(&rbd_dev->header_rwsem);
6097
6098 ret = rbd_dev_header_info(rbd_dev);
6099 if (ret)
6100 goto err_out_probe;
6101
6102 /*
6103 * If this image is the one being mapped, we have pool name and
6104 * id, image name and id, and snap name - need to fill snap id.
6105 * Otherwise this is a parent image, identified by pool, image
6106 * and snap ids - need to fill in names for those ids.
6107 */
6108 if (!depth)
6109 ret = rbd_spec_fill_snap_id(rbd_dev);
6110 else
6111 ret = rbd_spec_fill_names(rbd_dev);
6112 if (ret) {
6113 if (ret == -ENOENT)
6114 pr_info("snap %s/%s@%s does not exist\n",
6115 rbd_dev->spec->pool_name,
6116 rbd_dev->spec->image_name,
6117 rbd_dev->spec->snap_name);
6118 goto err_out_probe;
6119 }
6120
6121 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6122 ret = rbd_dev_v2_parent_info(rbd_dev);
6123 if (ret)
6124 goto err_out_probe;
6125
6126 /*
6127 * Need to warn users if this image is the one being
6128 * mapped and has a parent.
6129 */
6130 if (!depth && rbd_dev->parent_spec)
6131 rbd_warn(rbd_dev,
6132 "WARNING: kernel layering is EXPERIMENTAL!");
6133 }
6134
6135 ret = rbd_dev_probe_parent(rbd_dev, depth);
6136 if (ret)
6137 goto err_out_probe;
6138
6139 dout("discovered format %u image, header name is %s\n",
6140 rbd_dev->image_format, rbd_dev->header_oid.name);
6141 return 0;
6142
6143err_out_probe:
6144 if (!depth)
6145 up_write(&rbd_dev->header_rwsem);
6146 if (!depth)
6147 rbd_unregister_watch(rbd_dev);
6148 rbd_dev_unprobe(rbd_dev);
6149err_out_format:
6150 rbd_dev->image_format = 0;
6151 kfree(rbd_dev->spec->image_id);
6152 rbd_dev->spec->image_id = NULL;
6153 return ret;
6154}
6155
6156static ssize_t do_rbd_add(struct bus_type *bus,
6157 const char *buf,
6158 size_t count)
6159{
6160 struct rbd_device *rbd_dev = NULL;
6161 struct ceph_options *ceph_opts = NULL;
6162 struct rbd_options *rbd_opts = NULL;
6163 struct rbd_spec *spec = NULL;
6164 struct rbd_client *rbdc;
6165 bool read_only;
6166 int rc;
6167
6168 if (!capable(CAP_SYS_ADMIN))
6169 return -EPERM;
6170
6171 if (!try_module_get(THIS_MODULE))
6172 return -ENODEV;
6173
6174 /* parse add command */
6175 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6176 if (rc < 0)
6177 goto out;
6178
6179 rbdc = rbd_get_client(ceph_opts);
6180 if (IS_ERR(rbdc)) {
6181 rc = PTR_ERR(rbdc);
6182 goto err_out_args;
6183 }
6184
6185 /* pick the pool */
6186 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6187 if (rc < 0) {
6188 if (rc == -ENOENT)
6189 pr_info("pool %s does not exist\n", spec->pool_name);
6190 goto err_out_client;
6191 }
6192 spec->pool_id = (u64)rc;
6193
6194 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6195 if (!rbd_dev) {
6196 rc = -ENOMEM;
6197 goto err_out_client;
6198 }
6199 rbdc = NULL; /* rbd_dev now owns this */
6200 spec = NULL; /* rbd_dev now owns this */
6201 rbd_opts = NULL; /* rbd_dev now owns this */
6202
6203 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6204 if (!rbd_dev->config_info) {
6205 rc = -ENOMEM;
6206 goto err_out_rbd_dev;
6207 }
6208
6209 rc = rbd_dev_image_probe(rbd_dev, 0);
6210 if (rc < 0)
6211 goto err_out_rbd_dev;
6212
6213 /* If we are mapping a snapshot it must be marked read-only */
6214
6215 read_only = rbd_dev->opts->read_only;
6216 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6217 read_only = true;
6218 rbd_dev->mapping.read_only = read_only;
6219
6220 rc = rbd_dev_device_setup(rbd_dev);
6221 if (rc)
6222 goto err_out_image_probe;
6223
6224 if (rbd_dev->opts->exclusive) {
6225 rc = rbd_add_acquire_lock(rbd_dev);
6226 if (rc)
6227 goto err_out_device_setup;
6228 }
6229
6230 /* Everything's ready. Announce the disk to the world. */
6231
6232 rc = device_add(&rbd_dev->dev);
6233 if (rc)
6234 goto err_out_image_lock;
6235
6236 add_disk(rbd_dev->disk);
6237 /* see rbd_init_disk() */
6238 blk_put_queue(rbd_dev->disk->queue);
6239
6240 spin_lock(&rbd_dev_list_lock);
6241 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6242 spin_unlock(&rbd_dev_list_lock);
6243
6244 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6245 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6246 rbd_dev->header.features);
6247 rc = count;
6248out:
6249 module_put(THIS_MODULE);
6250 return rc;
6251
6252err_out_image_lock:
6253 rbd_dev_image_unlock(rbd_dev);
6254err_out_device_setup:
6255 rbd_dev_device_release(rbd_dev);
6256err_out_image_probe:
6257 rbd_dev_image_release(rbd_dev);
6258err_out_rbd_dev:
6259 rbd_dev_destroy(rbd_dev);
6260err_out_client:
6261 rbd_put_client(rbdc);
6262err_out_args:
6263 rbd_spec_put(spec);
6264 kfree(rbd_opts);
6265 goto out;
6266}
6267
6268static ssize_t rbd_add(struct bus_type *bus,
6269 const char *buf,
6270 size_t count)
6271{
6272 if (single_major)
6273 return -EINVAL;
6274
6275 return do_rbd_add(bus, buf, count);
6276}
6277
6278static ssize_t rbd_add_single_major(struct bus_type *bus,
6279 const char *buf,
6280 size_t count)
6281{
6282 return do_rbd_add(bus, buf, count);
6283}
6284
6285static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6286{
6287 while (rbd_dev->parent) {
6288 struct rbd_device *first = rbd_dev;
6289 struct rbd_device *second = first->parent;
6290 struct rbd_device *third;
6291
6292 /*
6293 * Follow to the parent with no grandparent and
6294 * remove it.
6295 */
6296 while (second && (third = second->parent)) {
6297 first = second;
6298 second = third;
6299 }
6300 rbd_assert(second);
6301 rbd_dev_image_release(second);
6302 rbd_dev_destroy(second);
6303 first->parent = NULL;
6304 first->parent_overlap = 0;
6305
6306 rbd_assert(first->parent_spec);
6307 rbd_spec_put(first->parent_spec);
6308 first->parent_spec = NULL;
6309 }
6310}
6311
6312static ssize_t do_rbd_remove(struct bus_type *bus,
6313 const char *buf,
6314 size_t count)
6315{
6316 struct rbd_device *rbd_dev = NULL;
6317 struct list_head *tmp;
6318 int dev_id;
6319 char opt_buf[6];
6320 bool force = false;
6321 int ret;
6322
6323 if (!capable(CAP_SYS_ADMIN))
6324 return -EPERM;
6325
6326 dev_id = -1;
6327 opt_buf[0] = '\0';
6328 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6329 if (dev_id < 0) {
6330 pr_err("dev_id out of range\n");
6331 return -EINVAL;
6332 }
6333 if (opt_buf[0] != '\0') {
6334 if (!strcmp(opt_buf, "force")) {
6335 force = true;
6336 } else {
6337 pr_err("bad remove option at '%s'\n", opt_buf);
6338 return -EINVAL;
6339 }
6340 }
6341
6342 ret = -ENOENT;
6343 spin_lock(&rbd_dev_list_lock);
6344 list_for_each(tmp, &rbd_dev_list) {
6345 rbd_dev = list_entry(tmp, struct rbd_device, node);
6346 if (rbd_dev->dev_id == dev_id) {
6347 ret = 0;
6348 break;
6349 }
6350 }
6351 if (!ret) {
6352 spin_lock_irq(&rbd_dev->lock);
6353 if (rbd_dev->open_count && !force)
6354 ret = -EBUSY;
6355 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6356 &rbd_dev->flags))
6357 ret = -EINPROGRESS;
6358 spin_unlock_irq(&rbd_dev->lock);
6359 }
6360 spin_unlock(&rbd_dev_list_lock);
6361 if (ret)
6362 return ret;
6363
6364 if (force) {
6365 /*
6366 * Prevent new IO from being queued and wait for existing
6367 * IO to complete/fail.
6368 */
6369 blk_mq_freeze_queue(rbd_dev->disk->queue);
6370 blk_set_queue_dying(rbd_dev->disk->queue);
6371 }
6372
6373 del_gendisk(rbd_dev->disk);
6374 spin_lock(&rbd_dev_list_lock);
6375 list_del_init(&rbd_dev->node);
6376 spin_unlock(&rbd_dev_list_lock);
6377 device_del(&rbd_dev->dev);
6378
6379 rbd_dev_image_unlock(rbd_dev);
6380 rbd_dev_device_release(rbd_dev);
6381 rbd_dev_image_release(rbd_dev);
6382 rbd_dev_destroy(rbd_dev);
6383 return count;
6384}
6385
6386static ssize_t rbd_remove(struct bus_type *bus,
6387 const char *buf,
6388 size_t count)
6389{
6390 if (single_major)
6391 return -EINVAL;
6392
6393 return do_rbd_remove(bus, buf, count);
6394}
6395
6396static ssize_t rbd_remove_single_major(struct bus_type *bus,
6397 const char *buf,
6398 size_t count)
6399{
6400 return do_rbd_remove(bus, buf, count);
6401}
6402
6403/*
6404 * create control files in sysfs
6405 * /sys/bus/rbd/...
6406 */
6407static int rbd_sysfs_init(void)
6408{
6409 int ret;
6410
6411 ret = device_register(&rbd_root_dev);
6412 if (ret < 0)
6413 return ret;
6414
6415 ret = bus_register(&rbd_bus_type);
6416 if (ret < 0)
6417 device_unregister(&rbd_root_dev);
6418
6419 return ret;
6420}
6421
6422static void rbd_sysfs_cleanup(void)
6423{
6424 bus_unregister(&rbd_bus_type);
6425 device_unregister(&rbd_root_dev);
6426}
6427
6428static int rbd_slab_init(void)
6429{
6430 rbd_assert(!rbd_img_request_cache);
6431 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6432 if (!rbd_img_request_cache)
6433 return -ENOMEM;
6434
6435 rbd_assert(!rbd_obj_request_cache);
6436 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6437 if (!rbd_obj_request_cache)
6438 goto out_err;
6439
6440 rbd_assert(!rbd_bio_clone);
6441 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6442 if (!rbd_bio_clone)
6443 goto out_err_clone;
6444
6445 return 0;
6446
6447out_err_clone:
6448 kmem_cache_destroy(rbd_obj_request_cache);
6449 rbd_obj_request_cache = NULL;
6450out_err:
6451 kmem_cache_destroy(rbd_img_request_cache);
6452 rbd_img_request_cache = NULL;
6453 return -ENOMEM;
6454}
6455
6456static void rbd_slab_exit(void)
6457{
6458 rbd_assert(rbd_obj_request_cache);
6459 kmem_cache_destroy(rbd_obj_request_cache);
6460 rbd_obj_request_cache = NULL;
6461
6462 rbd_assert(rbd_img_request_cache);
6463 kmem_cache_destroy(rbd_img_request_cache);
6464 rbd_img_request_cache = NULL;
6465
6466 rbd_assert(rbd_bio_clone);
6467 bioset_free(rbd_bio_clone);
6468 rbd_bio_clone = NULL;
6469}
6470
6471static int __init rbd_init(void)
6472{
6473 int rc;
6474
6475 if (!libceph_compatible(NULL)) {
6476 rbd_warn(NULL, "libceph incompatibility (quitting)");
6477 return -EINVAL;
6478 }
6479
6480 rc = rbd_slab_init();
6481 if (rc)
6482 return rc;
6483
6484 /*
6485 * The number of active work items is limited by the number of
6486 * rbd devices * queue depth, so leave @max_active at default.
6487 */
6488 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6489 if (!rbd_wq) {
6490 rc = -ENOMEM;
6491 goto err_out_slab;
6492 }
6493
6494 if (single_major) {
6495 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6496 if (rbd_major < 0) {
6497 rc = rbd_major;
6498 goto err_out_wq;
6499 }
6500 }
6501
6502 rc = rbd_sysfs_init();
6503 if (rc)
6504 goto err_out_blkdev;
6505
6506 if (single_major)
6507 pr_info("loaded (major %d)\n", rbd_major);
6508 else
6509 pr_info("loaded\n");
6510
6511 return 0;
6512
6513err_out_blkdev:
6514 if (single_major)
6515 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6516err_out_wq:
6517 destroy_workqueue(rbd_wq);
6518err_out_slab:
6519 rbd_slab_exit();
6520 return rc;
6521}
6522
6523static void __exit rbd_exit(void)
6524{
6525 ida_destroy(&rbd_dev_id_ida);
6526 rbd_sysfs_cleanup();
6527 if (single_major)
6528 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6529 destroy_workqueue(rbd_wq);
6530 rbd_slab_exit();
6531}
6532
6533module_init(rbd_init);
6534module_exit(rbd_exit);
6535
6536MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6537MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6538MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6539/* following authorship retained from original osdblk.c */
6540MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6541
6542MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6543MODULE_LICENSE("GPL");