Friday, August 17, 2007

For a long time I wanted to create a module to allow asynchronous file IO for Stackless Python.

The problem is that the currently available file object, does blocking calls to the read() and write() methods making all the interpreter block and not permitting the other tasklets do its job.

The solution was to create a module where the tasklet calls the method and only he gets blocked in the channel allowing other tasklets run freely... when the data is read/wrote a manager fires back to the channel and the requester tasklet continues.

I have already created a module using a threadpool where the requests are fired, the workers read or write the data and returns back to the calling tasklet but this consumes memory for the multiple OS threads running as workers and you have to configure the worker quantity based on your usage.

Fiddling around I found that on windows select() doesn't support file descriptors so the other option was OverlappedIO also known as IOCP.

This made me learn a thousand things I thought would never know... Ctypes, Windows internal APIs (kernel32 stuff), and even python internals to see how its fileobject works.

After this all here it is, the module that mimics the internal python file() object. Please test it and report any successes or failures. I also made a script to test its usage.

Hope it's useful.

Carlos

The code is below, if its too difficult to read, it is pasted here: http://dpaste.com/hold/17151/ and will be posted on Stackless Examples SVN


#
# Stackless Asynchronous file module:
#
# Author: Carlos E. de Paula <carlosedp@gmail.com>
#
# This code was written to serve as an example of Stackless Python usage.
# Feel free to email me with any questions, comments, or suggestions for
# improvement.
#
# This is an asynchronous file class in order to have a file module replacement
# that uses channels and a windows async API to allow its methods to
# block just the calling tasklet not the entire interpreter.
#
#
import os
import time
import stackless
from ctypes import *
from ctypes.wintypes import HANDLE, ULONG, DWORD, BOOL, LPCSTR, LPCWSTR, WinError

# Verify module compatibility
if os.name != 'nt':
raise ImportError('This module has been implemented for windows systems.')

# Windows structures

class _US(Structure):
_fields_ = [
("Offset", DWORD),
("OffsetHigh", DWORD),
]

class _U(Union):
_fields_ = [
("s", _US),
("Pointer", c_void_p),
]

_anonymous_ = ("s",)

class OVERLAPPED(Structure):
_fields_ = [
("Internal", POINTER(ULONG)),
("InternalHigh", POINTER(ULONG)),
("u", _U),
("hEvent", HANDLE),

# Custom fields.
("taskletID", ULONG),
]

_anonymous_ = ("u",)

# Windows kernel32 API

CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
CreateIoCompletionPort.restype = HANDLE

GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong),
POINTER(POINTER(OVERLAPPED)), DWORD)
GetQueuedCompletionStatus.restype = BOOL

ReadFile = windll.kernel32.ReadFile
ReadFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD), POINTER(OVERLAPPED))
ReadFile.restype = BOOL

WriteFile = windll.kernel32.WriteFile
WriteFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD), POINTER(OVERLAPPED))
WriteFile.restype = BOOL

CreateFileA = windll.kernel32.CreateFileA
CreateFileA.argtypes = (LPCSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
CreateFileA.restype = HANDLE

CreateFileW = windll.kernel32.CreateFileW
CreateFileW.argtypes = (LPCWSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
CreateFileW.restype = HANDLE

CloseHandle = windll.kernel32.CloseHandle
CloseHandle.argtypes = (HANDLE,)
CloseHandle.restype = BOOL

GetLastError = windll.kernel32.GetLastError

# Python API

pythonapi.PyErr_SetFromErrno.argtypes = (py_object,)
pythonapi.PyErr_SetFromErrno.restype = py_object

# Windows definitions

INVALID_HANDLE_VALUE = 0xFFFFFFFF
NULL = c_ulong()

WAIT_TIMEOUT = 0x102
ERROR_IO_PENDING = 997
FILE_FLAG_RANDOM_ACCESS = 0x10000000
FILE_FLAG_OVERLAPPED = 0x40000000

GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
FILE_APPEND_DATA = 0x00000004

FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002

OPEN_EXISTING = 3
OPEN_ALWAYS = 4
CREATE_ALWAYS = 2

# ----------------------------------------------------------------------------

class resultsManager(object):
"""
Manages the results sent by the CreateIoCompletionPort call.
The resultsManager dequeues the IO requests and keeps a list of pending handles.
The taskletID is stored in OVERLAPPED structure so it can be recalled and the
signalling data sent via its channel unblocking the original tasklet.
The handle is then removed from the dict.
"""

def __init__(self, numThreads=NULL):
self.running = True
self.handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
NULL, numThreads)
if self.handle == 0:
raise WinError()

self.numThreads = numThreads
stackless.tasklet(self.poll)()
self.overlappedByID = {}

def __del__(self):
if self.handle is None:
return
self.overlappedByID.clear()
CloseHandle(self.handle)

def poll(self, timeout=1):
while self.running and self.overlappedByID:
numBytes = DWORD()
completionKey = c_ulong()
ovp = POINTER(OVERLAPPED)()

ret = GetQueuedCompletionStatus(self.handle, byref(numBytes),
byref(completionKey), byref(ovp),
timeout)

if not ovp and ret == 0:
if GetLastError() == WAIT_TIMEOUT:
stackless.schedule()
continue

if ovp.contents.taskletID in self.overlappedByID:
#print ovp.contents.taskletID, " tasklet ID IN pool"
c = self.overlappedByID[ovp.contents.taskletID]
else:
#print ovp.contents.taskletID, " tasklet ID NOT in pool"
continue

#print "sending data back to channel in ID", ovp.contents.taskletID
c.send(numBytes)
#print "sent data to channel in ID", ovp.contents.taskletID, numBytes
self.UnregisterChannelObject(ovp.contents.taskletID)

self.running = False

def RegisterChannelObject(self, ob, c):
self.overlappedByID[ob] = c

def UnregisterChannelObject(self, ob):
if self.overlappedByID.has_key(ob):
del self.overlappedByID[ob]


mng = resultsManager()

class stacklessfile(object):
"""
stacklessfile(name[, mode[, buffering]]) -> stackless file object

This class creates a new file module permitting nonblocking IO calls
for tasklets using windows IOCP functionality.
When a read or write operation is called, only the calling tasklet is
blocked. In standard file module, the whole interpreter gets blocked
until the operation is concluded.

Open a file. The mode can be 'r', 'w' or 'a' for reading (default),
writing or appending. The file will be created if it doesn't exist
when opened for writing or appending; it will be truncated when
opened for writing. Add a 'b' to the mode for binary files.
Add a '+' to the mode to allow simultaneous reading and writing.
If the buffering argument is given, 0 means unbuffered, 1 means line
buffered, and larger numbers specify the buffer size.
Add a 'U' to mode to open the file for input with universal newline
support. Any line ending in the input file will be seen as a \'\\n\'
in Python. Also, a file so opened gains the attribute 'newlines';
the value for this attribute is one of None (no newline read yet),
\'\\r\', \'\\n\', \'\\r\\n\' or a tuple containing all the newline types seen.
"""

closed = True
def __init__(self, name, mode='r', buffering=-1):
"""
Initializes the file object and creates an internal file object.
"""

if not self.closed:
self.close()
self.name = name
self.mode = mode
self.offset = 0
self.iocpLinked = False
self._check_manager()
self.open_handle()
self.closed = False

def __repr__(self):
return "<%s file '%s', mode '%s' at 0x%08X>" % ([ "open", "closed" ][self.closed], self.name, self.mode, id(self))

def __del__(self):
self.close()

def _check_manager(self):
if not mng.running:
stackless.tasklet(mng.poll)()
mng.running = True
#print "ERROR - Manager not running"

def _check_still_open(self):
if self.closed:
raise ValueError("I/O operation on closed file")

def _ensure_iocp_association(self):
if not self.iocpLinked:
CreateIoCompletionPort(self.handle, mng.handle, NULL, mng.numThreads)
self.iocpLinked = True

def close(self):
"""
close() -> None or (perhaps) an integer. Close the file.

Sets data attribute .closed to True. A closed file canno
further I/O operations. close() may be called more than
error. Some kinds of file objects (for example, opened b
may return an exit status upon closing.
"""

if not self.closed:
CloseHandle(self.handle)
del self.handle
self.closed = True

def open_handle(self):
self.binary = 'b' in self.mode
access = GENERIC_READ
if 'w' in self.mode or ('r' in self.mode and '+' in self.mode):
access |= GENERIC_WRITE
if 'a' in self.mode:
access |= FILE_APPEND_DATA

share = FILE_SHARE_READ | FILE_SHARE_WRITE

if 'w' in self.mode:
disposition = CREATE_ALWAYS
elif 'r' in self.mode and '+' in self.mode:
disposition = OPEN_ALWAYS
else:
disposition = OPEN_EXISTING

flags = FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_OVERLAPPED

if isinstance(self.name, unicode):
func = CreateFileW
else:
func = CreateFileA

self.handle = func(self.name, access,
share, c_void_p(), disposition,
flags, NULL )

if self.handle == INVALID_HANDLE_VALUE:
raise WinError()

self.iocpLinked = False

def read(self, size=None):
"""
read([size]) -> read at most size bytes, returned as a string.
"""

self._check_still_open()
maxBytesToRead = int(os.path.getsize(self.name)) - self.offset
if (size is None) or (maxBytesToRead < size):
size = maxBytesToRead

bytesRead = DWORD()
self.o = OVERLAPPED()
self.o.Offset = self.offset
self.o.taskletID = id(self)
self.buffer = create_string_buffer(size)
self.channel = stackless.channel()
self._ensure_iocp_association()
self._check_manager()

#print self.o.taskletID, "ID on read", self.name
#print "firing ReadFile", self.name

r = ReadFile(self.handle, self.buffer,
size, byref(bytesRead), byref(self.o));
#print "fired ReadFile", self.name

if r == 0:
if GetLastError() != ERROR_IO_PENDING:
pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
0, c_char_p(self.name))
mng.RegisterChannelObject(self.o.taskletID, self.channel)
#print "blocked on channel",self.channel, self.name, self.o.taskletID
self.channel.receive()
#print "returned from channel",self.channel, self.name, self.o.taskletID

self.offset += size
return self.buffer[:size]

def write(self, data):
"""
write(str) -> None. Write string str to file.
"""

self._check_still_open()
bytesToWrite = c_int()
writeBufferPtr = c_char_p()
bytesWritten = DWORD()
self.o = OVERLAPPED()
self.o.Offset = self.offset
self.o.taskletID = id(self)
self.channel = stackless.channel()
self._ensure_iocp_association()
self._check_manager()
#print self.o.taskletID, "ID on write", self.name

fmt = self.binary and "s#" or "t#"
ret = pythonapi.PyArg_ParseTuple(py_object((data,)), c_char_p(fmt),
byref(writeBufferPtr), byref(bytesToWrite))
if ret == 0:
raise WinError()

#print "firing WriteFile", self.name
r = WriteFile(self.handle, writeBufferPtr,
bytesToWrite.value, byref(bytesWritten), byref(self.o))
#print "fired WriteFile", self.name

if r == 0:
if GetLastError() != ERROR_IO_PENDING:
pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
0, c_char_p(self.name))

mng.RegisterChannelObject(self.o.taskletID, self.channel)
#print "blocked on channel",self.channel, self.name, self.o.taskletID
written = self.channel.receive()
#print "returned from channel",self.channel, self.name, self.o.taskletID
else:
written = bytesWritten
#print "Checking contents...", bytesToWrite.value, written.value, self.name

if bytesToWrite.value != written.value:
# Check if the quantity of bytes sent has been written to the file
#print self.o.taskletID, "size mismatch"
raise WinError()

def tell(self):
"""
tell() -> current file position, an integer (may be a long integer).
"""

return self.offset

def seek(self, offset, whence=os.SEEK_SET):
"""
seek(offset[, whence]) -> None. Move to new file position.

Argument offset is a byte count. Optional argument whence defaults to
0 (offset from start of file, offset should be >= 0); other values are 1
(move relative to current position, positive or negative), and 2 (move
relative to end of file, usually negative, although many platforms allow
seeking beyond the end of a file). If the file is opened in text mode,
only offsets returned by tell() are legal. Use of other offsets causes
undefined behavior.
Note that not all file objects are seekable.
"""

self._check_still_open()
if whence == os.SEEK_SET:
self.offset = offset
elif whence == os.SEEK_CUR:
self.offset += offset
elif whence == os.SEEK_END:
raise RuntimeError("SEEK_END unimplemented")

def flush(self):
pass

def isatty(self):
"""
isatty() -> true or false. True if the file is connected to a tty device.
"""

self._check_still_open()
return False

def isatty(self):
self._check_still_open()
return False


if __name__ == '__main__':
import time
import glob
import os
stdfile = file

# On your stackless apps, use these 2 lines below
from stacklessfile import stacklessfile as file
open = file

# Function to copy a file
def copyfile(who, infile, out):
st = time.time()
f1 = file(infile, 'rb')
f2 = file(out, 'wb')
print "%s started reading %s ..." % (who, infile)
a = f1.read()
print "%s started writing %s -> %s ..." % (who, infile, out)
f2.write(a)
f1.close()
f2.close()
print "Finished tasklet %s (%s) in %s" % (who, infile, time.time()-st)

# Creating two dummy files
newfile = stdfile('test-small.txt','w')
for x in xrange(10000):
newfile.write(str(x))
newfile.close()

newfile2 = stdfile('test-big.txt','w')
for x in xrange(500000):
newfile2.write(str(x))
newfile2.close()

# Launching tasklets to perform the file copy
for i in xrange(1,11):
stackless.tasklet(copyfile)(i, 'test-big.txt','big%s.txt' % i)

for i in xrange(1,21):
stackless.tasklet(copyfile)(i, 'test-small.txt','sm%s.txt' % i)

st = time.time()
stackless.run()
print "Total time is %s seconds." % (time.time() - st)

# Cleanup all test files used
for f in glob.glob('test*.txt'):
os.unlink(f)
for f in glob.glob('sm*.txt'):
os.unlink(f)
for f in glob.glob('big*.txt'):
os.unlink(f)

3 comments:

Arnar Birgisson said...

This is excellent work.

Anonymous said...

did you fix SSL yet?

Anonymous said...

Who knows where to download XRumer 5.0 Palladium?
Help, please. All recommend this program to effectively advertise on the Internet, this is the best program!

Post a Comment