From: Suparna Bhattacharya <suparna@in.ibm.com>

OK, have been playing with a patch to retry enable the osync speedup 
code.  This should get us O_SYNC support for aio (for ext2 at least).

DESC
AIO: fix a BUG
EDESC
From: Suparna Bhattacharya <suparna@in.ibm.com>

Might fix a BUG which was seen in testing.
DESC
Unify o_sync changes for aio and regular writes
EDESC
From: Suparna Bhattacharya <suparna@in.ibm.com>

The current implementation of O_SYNC support for AIO calls sync_page_range
explicitly in the high level aio write handlers after invoking the fops
aio_write method.  This a duplication of logic across sync i/o and aio (not
good from a maintenance perspective, and more error prone, e.g.  forgets to
check for O_DIRECT).

More importantly, it doesn't take into account the possibility that different
filesystems may have their own O_SYNC implementations in their corresponding
aio methods.  This is a problem for example in the case of ext3 which may
commit the journal after data and meta-data are written out.

This patch fixes the above problems by moving the call to sync_page_range out
of the high level aio_pwrite handler, and back into generic_file_aio_write
for both aio and sync i/o.  It takes care to make sure that sync_page_range
is called only after all the data is copied in the AIO case, rather than on
every retry.

DESC
aio-O_SYNC-fix bits got lost
EDESC
From: Suparna Bhattacharya <suparna@in.ibm.com>

Patch containing just the missing hunks 

DESC
aio: writev nr_segs fix
EDESC
From: Suparna Bhattacharya <suparna@in.ibm.com>

I'm not sure if this would help here, but there is one bug which I just
spotted which would affect writev from XFS.  I wasn't passing the nr_segs
down properly.

DESC
More AIO O_SYNC related fixes
EDESC
From: Suparna Bhattacharya <suparna@in.ibm.com>

The attached patch plugs some inaccuracies and associated
inefficiencies in sync_page_range handling for AIO 
O_SYNC writes.

- Correctly propagate the return value from sync_page_range
  when some but not all pages in the range have completed
  writeback (Otherwise, O_SYNC AIO writes could report
  completion before all the relevant writeouts actually 
  completed)
- Reset and check page dirty settings prior to attempting
  writeouts to avoid blocking synchronously for a writeback
  initiated otherwise.
- Disable repeated calls to writeout or generic_osync_inode
  during AIO retries for the same iocb.
- Don't block synchronously for data writeouts to complete
  when writing out inode state for generic_osync_inode in
  AIO context. The wait_on_page_range call after this will
  take care of completing the iocb only after the data has
  been written out. [TBD: Is this particular change OK for 
  data=ordered, or does it need rethinking ?]

I have run aio-stress with modifications for AIO O_SYNC 




diff -upN reference/fs/aio.c current/fs/aio.c
--- reference/fs/aio.c	2004-05-01 18:15:06.000000000 -0700
+++ current/fs/aio.c	2004-05-01 18:18:41.000000000 -0700
@@ -27,6 +27,7 @@
 #include <linux/aio.h>
 #include <linux/highmem.h>
 #include <linux/workqueue.h>
+#include <linux/writeback.h>
 
 #include <asm/kmap_types.h>
 #include <asm/uaccess.h>
@@ -1303,28 +1304,40 @@ static ssize_t aio_pread(struct kiocb *i
 static ssize_t aio_pwrite(struct kiocb *iocb)
 {
 	struct file *file = iocb->ki_filp;
+	struct address_space *mapping = file->f_mapping;
+	struct inode *inode = mapping->host;
 	ssize_t ret = 0;
 
 	ret = file->f_op->aio_write(iocb, iocb->ki_buf,
-		iocb->ki_left, iocb->ki_pos);
+				iocb->ki_left, iocb->ki_pos);
 
 	/*
-	 * TBD: Even if iocb->ki_left = 0, could we need to
-	 * wait for data to be sync'd ? Or can we assume
-	 * that aio_fdsync/aio_fsync would be called explicitly
-	 * as required.
+	 * Even if iocb->ki_left = 0, we may need to wait
+	 * for a balance_dirty_pages to complete
 	 */
 	if (ret > 0) {
-		iocb->ki_buf += ret;
+		iocb->ki_buf += iocb->ki_buf ? ret : 0;
 		iocb->ki_left -= ret;
 
 		ret = -EIOCBRETRY;
 	}
 
 	/* This means we must have transferred all that we could */
-	/* No need to retry anymore */
-	if (ret == 0)
+	/* No need to retry anymore unless we need to osync data */
+	if (ret == 0) {
 		ret = iocb->ki_nbytes - iocb->ki_left;
+		if (!iocb->ki_buf)
+			return ret;
+
+		/* Set things up for potential O_SYNC */
+		if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
+			iocb->ki_buf = NULL;
+			iocb->ki_pos -= ret; /* back up fpos */
+			iocb->ki_left = ret; /* sync what we have written out */
+			iocb->ki_nbytes = ret;
+			ret = -EIOCBRETRY;
+		}
+	}
 
 	return ret;
 }
diff -upN reference/include/linux/writeback.h current/include/linux/writeback.h
--- reference/include/linux/writeback.h	2004-05-01 18:15:06.000000000 -0700
+++ current/include/linux/writeback.h	2004-05-01 18:18:41.000000000 -0700
@@ -92,8 +92,10 @@ void page_writeback_init(void);
 void balance_dirty_pages_ratelimited(struct address_space *mapping);
 int pdflush_operation(void (*fn)(unsigned long), unsigned long arg0);
 int do_writepages(struct address_space *mapping, struct writeback_control *wbc);
-int sync_page_range(struct inode *inode, struct address_space *mapping,
+ssize_t sync_page_range(struct inode *inode, struct address_space *mapping,
 			loff_t pos, size_t count);
+ssize_t sync_page_range_nolock(struct inode *inode, struct address_space
+		*mapping, loff_t pos, size_t count);
 
 /* pdflush.c */
 extern int nr_pdflush_threads;	/* Global so it can be exported to sysctl
diff -upN reference/include/linux/aio.h current/include/linux/aio.h
--- reference/include/linux/aio.h	2004-05-01 18:11:25.000000000 -0700
+++ current/include/linux/aio.h	2004-05-01 18:18:41.000000000 -0700
@@ -29,21 +29,26 @@ struct kioctx;
 #define KIF_LOCKED		0
 #define KIF_KICKED		1
 #define KIF_CANCELLED		2
+#define KIF_SYNCED		3
 
 #define kiocbTryLock(iocb)	test_and_set_bit(KIF_LOCKED, &(iocb)->ki_flags)
 #define kiocbTryKick(iocb)	test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags)
+#define kiocbTrySync(iocb)	test_and_set_bit(KIF_SYNCED, &(iocb)->ki_flags)
 
 #define kiocbSetLocked(iocb)	set_bit(KIF_LOCKED, &(iocb)->ki_flags)
 #define kiocbSetKicked(iocb)	set_bit(KIF_KICKED, &(iocb)->ki_flags)
 #define kiocbSetCancelled(iocb)	set_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+#define kiocbSetSynced(iocb)	set_bit(KIF_SYNCED, &(iocb)->ki_flags)
 
 #define kiocbClearLocked(iocb)	clear_bit(KIF_LOCKED, &(iocb)->ki_flags)
 #define kiocbClearKicked(iocb)	clear_bit(KIF_KICKED, &(iocb)->ki_flags)
 #define kiocbClearCancelled(iocb)	clear_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+#define kiocbClearSynced(iocb)	clear_bit(KIF_SYNCED, &(iocb)->ki_flags)
 
 #define kiocbIsLocked(iocb)	test_bit(KIF_LOCKED, &(iocb)->ki_flags)
 #define kiocbIsKicked(iocb)	test_bit(KIF_KICKED, &(iocb)->ki_flags)
 #define kiocbIsCancelled(iocb)	test_bit(KIF_CANCELLED, &(iocb)->ki_flags)
+#define kiocbIsSynced(iocb)	test_bit(KIF_SYNCED, &(iocb)->ki_flags)
 
 struct kiocb {
 	struct list_head	ki_run_list;
diff -upN reference/mm/filemap.c current/mm/filemap.c
--- reference/mm/filemap.c	2004-05-01 18:15:06.000000000 -0700
+++ current/mm/filemap.c	2004-05-01 18:18:41.000000000 -0700
@@ -973,22 +973,19 @@ __generic_file_aio_read(struct kiocb *io
 out:
 	return retval;
 }
-
 EXPORT_SYMBOL(__generic_file_aio_read);
 
-ssize_t
-generic_file_aio_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos)
+ssize_t generic_file_aio_read(struct kiocb *iocb, char __user *buf,
+				size_t count, loff_t pos)
 {
 	struct iovec local_iov = { .iov_base = buf, .iov_len = count };
 
-	BUG_ON(iocb->ki_pos != pos);
 	return __generic_file_aio_read(iocb, &local_iov, 1, &iocb->ki_pos);
 }
-
 EXPORT_SYMBOL(generic_file_aio_read);
 
-ssize_t
-generic_file_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
+ssize_t generic_file_read(struct file *filp, char __user *buf,
+				size_t count, loff_t *ppos)
 {
 	struct iovec local_iov = { .iov_base = buf, .iov_len = count };
 	struct kiocb kiocb;
@@ -1000,10 +997,10 @@ generic_file_read(struct file *filp, cha
 		ret = wait_on_sync_kiocb(&kiocb);
 	return ret;
 }
-
 EXPORT_SYMBOL(generic_file_read);
 
-int file_send_actor(read_descriptor_t * desc, struct page *page, unsigned long offset, unsigned long size)
+int file_send_actor(read_descriptor_t * desc, struct page *page,
+				unsigned long offset, unsigned long size)
 {
 	ssize_t written;
 	unsigned long count = desc->count;
@@ -1858,7 +1855,7 @@ EXPORT_SYMBOL(generic_write_checks);
  *							okir@monad.swb.de
  */
 ssize_t
-generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov,
+__generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov,
 				unsigned long nr_segs, loff_t *ppos)
 {
 	struct file *file = iocb->ki_filp;
@@ -2039,7 +2036,7 @@ generic_file_aio_write_nolock(struct kio
 	 */
 	if (likely(status >= 0)) {
 		if (unlikely((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
-			if (!a_ops->writepage || !is_sync_kiocb(iocb))
+			if (!a_ops->writepage)
 				status = generic_osync_inode(inode, mapping,
 						OSYNC_METADATA|OSYNC_DATA);
 		}
@@ -2064,6 +2061,55 @@ out:
 EXPORT_SYMBOL(generic_file_aio_write_nolock);
 
 ssize_t
+generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov,
+				unsigned long nr_segs, loff_t *ppos)
+{
+	struct file *file = iocb->ki_filp;
+	struct address_space *mapping = file->f_mapping;
+	struct inode *inode = mapping->host;
+	ssize_t ret;
+	loff_t pos = *ppos;
+
+	if (!iov->iov_base && !is_sync_kiocb(iocb)) {
+		/* nothing to transfer, may just need to sync data */
+		ret = iov->iov_len; /* vector AIO not supported yet */
+		goto osync;
+	}
+
+	ret = __generic_file_aio_write_nolock(iocb, iov, nr_segs, ppos);
+
+	/*
+	 * Avoid doing a sync in parts for aio - its more efficient to
+	 * call in again after all the data has been copied
+	 */
+	if (!is_sync_kiocb(iocb))
+		return ret;
+
+osync:
+	if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
+		ret = sync_page_range_nolock(inode, mapping, pos, ret);
+		if (ret >= 0)
+			*ppos = pos + ret;
+	}
+	return ret;
+}
+
+
+ssize_t
+__generic_file_write_nolock(struct file *file, const struct iovec *iov,
+				unsigned long nr_segs, loff_t *ppos)
+{
+	struct kiocb kiocb;
+	ssize_t ret;
+
+	init_sync_kiocb(&kiocb, file);
+	ret = __generic_file_aio_write_nolock(&kiocb, iov, nr_segs, ppos);
+	if (-EIOCBQUEUED == ret)
+		ret = wait_on_sync_kiocb(&kiocb);
+	return ret;
+}
+
+ssize_t
 generic_file_write_nolock(struct file *file, const struct iovec *iov,
 				unsigned long nr_segs, loff_t *ppos)
 {
@@ -2089,19 +2135,29 @@ ssize_t generic_file_aio_write(struct ki
 	struct iovec local_iov = { .iov_base = (void __user *)buf,
 					.iov_len = count };
 
-	BUG_ON(iocb->ki_pos != pos);
+	if (!buf && !is_sync_kiocb(iocb)) {
+		/* nothing to transfer, may just need to sync data */
+		ret = count;
+		goto osync;
+	}
 
 	down(&inode->i_sem);
-	ret = generic_file_aio_write_nolock(iocb, &local_iov, 1,
+	ret = __generic_file_aio_write_nolock(iocb, &local_iov, 1,
 						&iocb->ki_pos);
 	up(&inode->i_sem);
 
-	if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
-		ssize_t err;
+	/*
+	 * Avoid doing a sync in parts for aio - its more efficient to
+	 * call in again after all the data has been copied
+	 */
+	if (!is_sync_kiocb(iocb))
+		return ret;
 
-		err = sync_page_range(inode, mapping, pos, ret);
-		if (err < 0)
-			ret = err;
+osync:
+	if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
+		ret = sync_page_range(inode, mapping, pos, ret);
+		if (ret >= 0)
+			iocb->ki_pos = pos + ret;
 	}
 	return ret;
 }
@@ -2117,7 +2173,7 @@ ssize_t generic_file_write(struct file *
 					.iov_len = count };
 
 	down(&inode->i_sem);
-	ret = generic_file_write_nolock(file, &local_iov, 1, ppos);
+	ret = __generic_file_write_nolock(file, &local_iov, 1, ppos);
 	up(&inode->i_sem);
 
 	if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
@@ -2154,11 +2210,11 @@ ssize_t generic_file_writev(struct file 
 	ssize_t ret;
 
 	down(&inode->i_sem);
-	ret = generic_file_write_nolock(file, iov, nr_segs, ppos);
+	ret = __generic_file_write_nolock(file, iov, nr_segs, ppos);
 	up(&inode->i_sem);
 
 	if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
-		int err;
+		ssize_t err;
 
 		err = sync_page_range(inode, mapping, *ppos - ret, ret);
 		if (err < 0)
diff -upN reference/mm/page-writeback.c current/mm/page-writeback.c
--- reference/mm/page-writeback.c	2004-05-01 18:18:15.000000000 -0700
+++ current/mm/page-writeback.c	2004-05-01 18:18:41.000000000 -0700
@@ -771,16 +771,19 @@ int mapping_tagged(struct address_space 
 }
 EXPORT_SYMBOL(mapping_tagged);
 
-static int operate_on_page_range(struct address_space *mapping,
+static ssize_t operate_on_page_range(struct address_space *mapping,
 		loff_t pos, size_t count, int (*operator)(struct page *))
 {
 	pgoff_t first = pos >> PAGE_CACHE_SHIFT;
 	pgoff_t last = (pos + count - 1) >> PAGE_CACHE_SHIFT;	/* inclusive */
 	pgoff_t next = first;
 	struct pagevec pvec;
-	ssize_t ret = 0;
+	ssize_t ret = 0, bytes = 0;
 	int i;
 
+	if (count == 0)
+		return 0;
+
 	pagevec_init(&pvec, 0);
 	while (pagevec_lookup(&pvec, mapping, next,
 				min((pgoff_t)PAGEVEC_SIZE, last - next + 1))) {
@@ -795,6 +798,8 @@ static int operate_on_page_range(struct 
 			}
 			next = page->index + 1;
 			ret = (*operator)(page);
+			if (ret == -EIOCBRETRY)
+				break;
 			if (PageError(page)) {
 				if (!ret)
 					ret = -EIO;
@@ -803,20 +808,22 @@ static int operate_on_page_range(struct 
 				break;
 		}
 		pagevec_release(&pvec);
-		if (next > last)
+		if ((next > last) || (ret == -EIOCBRETRY))
 			break;
 	}
-	return ret;
+	bytes = (next << PAGE_CACHE_SHIFT) - pos;
+	if (bytes > count)
+		bytes = count;
+	return (bytes && (!ret || (ret == -EIOCBRETRY))) ? bytes : ret;
 }
 
 static int page_waiter(struct page *page)
 {
 	unlock_page(page);
-	wait_on_page_writeback(page);
-	return 0;
+	return wait_on_page_writeback_wq(page, current->io_wait);
 }
 
-static int
+static size_t
 wait_on_page_range(struct address_space *mapping, loff_t pos, size_t count)
 {
 	return operate_on_page_range(mapping, pos, count, page_waiter);
@@ -829,11 +836,15 @@ static int page_writer(struct page *page
 		.nr_to_write	= 1,
 	};
 
+	if (!test_clear_page_dirty(page)) {
+		unlock_page(page);
+		return 0;
+	}
 	wait_on_page_writeback(page);
 	return page->mapping->a_ops->writepage(page, &wbc);
 }
 
-static int
+static ssize_t
 write_out_page_range(struct address_space *mapping, loff_t pos, size_t count)
 {
 	return operate_on_page_range(mapping, pos, count, page_writer);
@@ -847,22 +858,58 @@ write_out_page_range(struct address_spac
  * We need to re-take i_sem during the generic_osync_inode list walk because
  * it is otherwise livelockable.
  */
-int sync_page_range(struct inode *inode, struct address_space *mapping,
+ssize_t sync_page_range(struct inode *inode, struct address_space *mapping,
 			loff_t pos, size_t count)
 {
-	int ret;
+	int ret = 0;
 
+	if (in_aio()) {
+		/* Already issued writeouts for this iocb ? */
+		if (kiocbTrySync(io_wait_to_kiocb(current->io_wait)))
+			goto do_wait; /* just need to check if done */
+	}
 	if (!mapping->a_ops->writepage)
 		return 0;
 	if (mapping->backing_dev_info->memory_backed)
 		return 0;
 	ret = write_out_page_range(mapping, pos, count);
-	if (ret == 0) {
+	if (ret >= 0) {
 		down(&inode->i_sem);
 		ret = generic_osync_inode(inode, mapping, OSYNC_METADATA);
 		up(&inode->i_sem);
 	}
-	if (ret == 0)
+do_wait:
+	if (ret >= 0)
+		ret = wait_on_page_range(mapping, pos, count);
+	return ret;
+}
+
+/*
+ * It is really better to use sync_page_range, rather than call
+ * sync_page_range_nolock while holding i_sem, if you don't
+ * want to block parallel O_SYNC writes until the pages in this
+ * range are written out.
+ */
+ssize_t sync_page_range_nolock(struct inode *inode, struct address_space
+	*mapping, loff_t pos, size_t count)
+{
+	ssize_t ret = 0;
+
+	if (in_aio()) {
+		/* Already issued writeouts for this iocb ? */
+		if (kiocbTrySync(io_wait_to_kiocb(current->io_wait)))
+			goto do_wait; /* just need to check if done */
+	}
+	if (!mapping->a_ops->writepage)
+		return 0;
+	if (mapping->backing_dev_info->memory_backed)
+		return 0;
+	ret = write_out_page_range(mapping, pos, count);
+	if (ret >= 0) {
+		ret = generic_osync_inode(inode, mapping, OSYNC_METADATA);
+	}
+do_wait:
+	if (ret >= 0)
 		ret = wait_on_page_range(mapping, pos, count);
 	return ret;
 }