For a long time I wanted to create a module to allow asynchronous file IO for Stackless Python.
The problem is that the currently available file object, does blocking calls to the read() and write() methods making all the interpreter block and not permitting the other tasklets do its job.
The solution was to create a module where the tasklet calls the method and only he gets blocked in the channel allowing other tasklets run freely... when the data is read/wrote a manager fires back to the channel and the requester tasklet continues.
I have already created a module using a threadpool where the requests are fired, the workers read or write the data and returns back to the calling tasklet but this consumes memory for the multiple OS threads running as workers and you have to configure the worker quantity based on your usage.
Fiddling around I found that on windows select() doesn't support file descriptors so the other option was OverlappedIO also known as IOCP.
This made me learn a thousand things I thought would never know... Ctypes, Windows internal APIs (kernel32 stuff), and even python internals to see how its fileobject works.
After this all here it is, the module that mimics the internal python file() object. Please test it and report any successes or failures. I also made a script to test its usage.
Hope it's useful.
Carlos
The code is below, if its too difficult to read, it is pasted here: http://dpaste.com/hold/17151/ and will be posted on Stackless Examples SVN
#
# Stackless Asynchronous file module:
#
# Author: Carlos E. de Paula <carlosedp@gmail.com>
#
# This code was written to serve as an example of Stackless Python usage.
# Feel free to email me with any questions, comments, or suggestions for
# improvement.
#
# This is an asynchronous file class in order to have a file module replacement
# that uses channels and a windows async API to allow its methods to
# block just the calling tasklet not the entire interpreter.
#
#
import os
import time
import stackless
from ctypes import *
from ctypes.wintypes import HANDLE, ULONG, DWORD, BOOL, LPCSTR, LPCWSTR, WinError
# Verify module compatibility
if os.name != 'nt':
raise ImportError('This module has been implemented for windows systems.')
# Windows structures
class _US(Structure):
_fields_ = [
("Offset", DWORD),
("OffsetHigh", DWORD),
]
class _U(Union):
_fields_ = [
("s", _US),
("Pointer", c_void_p),
]
_anonymous_ = ("s",)
class OVERLAPPED(Structure):
_fields_ = [
("Internal", POINTER(ULONG)),
("InternalHigh", POINTER(ULONG)),
("u", _U),
("hEvent", HANDLE),
# Custom fields.
("taskletID", ULONG),
]
_anonymous_ = ("u",)
# Windows kernel32 API
CreateIoCompletionPort = windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.argtypes = (HANDLE, HANDLE, POINTER(c_ulong), DWORD)
CreateIoCompletionPort.restype = HANDLE
GetQueuedCompletionStatus = windll.kernel32.GetQueuedCompletionStatus
GetQueuedCompletionStatus.argtypes = (HANDLE, POINTER(DWORD), POINTER(c_ulong),
POINTER(POINTER(OVERLAPPED)), DWORD)
GetQueuedCompletionStatus.restype = BOOL
ReadFile = windll.kernel32.ReadFile
ReadFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD), POINTER(OVERLAPPED))
ReadFile.restype = BOOL
WriteFile = windll.kernel32.WriteFile
WriteFile.argtypes = (HANDLE, c_void_p, DWORD, POINTER(DWORD), POINTER(OVERLAPPED))
WriteFile.restype = BOOL
CreateFileA = windll.kernel32.CreateFileA
CreateFileA.argtypes = (LPCSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
CreateFileA.restype = HANDLE
CreateFileW = windll.kernel32.CreateFileW
CreateFileW.argtypes = (LPCWSTR, DWORD, DWORD, c_void_p, DWORD, DWORD, HANDLE)
CreateFileW.restype = HANDLE
CloseHandle = windll.kernel32.CloseHandle
CloseHandle.argtypes = (HANDLE,)
CloseHandle.restype = BOOL
GetLastError = windll.kernel32.GetLastError
# Python API
pythonapi.PyErr_SetFromErrno.argtypes = (py_object,)
pythonapi.PyErr_SetFromErrno.restype = py_object
# Windows definitions
INVALID_HANDLE_VALUE = 0xFFFFFFFF
NULL = c_ulong()
WAIT_TIMEOUT = 0x102
ERROR_IO_PENDING = 997
FILE_FLAG_RANDOM_ACCESS = 0x10000000
FILE_FLAG_OVERLAPPED = 0x40000000
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
FILE_APPEND_DATA = 0x00000004
FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002
OPEN_EXISTING = 3
OPEN_ALWAYS = 4
CREATE_ALWAYS = 2
# ----------------------------------------------------------------------------
class resultsManager(object):
"""
Manages the results sent by the CreateIoCompletionPort call.
The resultsManager dequeues the IO requests and keeps a list of pending handles.
The taskletID is stored in OVERLAPPED structure so it can be recalled and the
signalling data sent via its channel unblocking the original tasklet.
The handle is then removed from the dict.
"""
def __init__(self, numThreads=NULL):
self.running = True
self.handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
NULL, numThreads)
if self.handle == 0:
raise WinError()
self.numThreads = numThreads
stackless.tasklet(self.poll)()
self.overlappedByID = {}
def __del__(self):
if self.handle is None:
return
self.overlappedByID.clear()
CloseHandle(self.handle)
def poll(self, timeout=1):
while self.running and self.overlappedByID:
numBytes = DWORD()
completionKey = c_ulong()
ovp = POINTER(OVERLAPPED)()
ret = GetQueuedCompletionStatus(self.handle, byref(numBytes),
byref(completionKey), byref(ovp),
timeout)
if not ovp and ret == 0:
if GetLastError() == WAIT_TIMEOUT:
stackless.schedule()
continue
if ovp.contents.taskletID in self.overlappedByID:
#print ovp.contents.taskletID, " tasklet ID IN pool"
c = self.overlappedByID[ovp.contents.taskletID]
else:
#print ovp.contents.taskletID, " tasklet ID NOT in pool"
continue
#print "sending data back to channel in ID", ovp.contents.taskletID
c.send(numBytes)
#print "sent data to channel in ID", ovp.contents.taskletID, numBytes
self.UnregisterChannelObject(ovp.contents.taskletID)
self.running = False
def RegisterChannelObject(self, ob, c):
self.overlappedByID[ob] = c
def UnregisterChannelObject(self, ob):
if self.overlappedByID.has_key(ob):
del self.overlappedByID[ob]
mng = resultsManager()
class stacklessfile(object):
"""
stacklessfile(name[, mode[, buffering]]) -> stackless file object
This class creates a new file module permitting nonblocking IO calls
for tasklets using windows IOCP functionality.
When a read or write operation is called, only the calling tasklet is
blocked. In standard file module, the whole interpreter gets blocked
until the operation is concluded.
Open a file. The mode can be 'r', 'w' or 'a' for reading (default),
writing or appending. The file will be created if it doesn't exist
when opened for writing or appending; it will be truncated when
opened for writing. Add a 'b' to the mode for binary files.
Add a '+' to the mode to allow simultaneous reading and writing.
If the buffering argument is given, 0 means unbuffered, 1 means line
buffered, and larger numbers specify the buffer size.
Add a 'U' to mode to open the file for input with universal newline
support. Any line ending in the input file will be seen as a \'\\n\'
in Python. Also, a file so opened gains the attribute 'newlines';
the value for this attribute is one of None (no newline read yet),
\'\\r\', \'\\n\', \'\\r\\n\' or a tuple containing all the newline types seen.
"""
closed = True
def __init__(self, name, mode='r', buffering=-1):
"""
Initializes the file object and creates an internal file object.
"""
if not self.closed:
self.close()
self.name = name
self.mode = mode
self.offset = 0
self.iocpLinked = False
self._check_manager()
self.open_handle()
self.closed = False
def __repr__(self):
return "<%s file '%s', mode '%s' at 0x%08X>" % ([ "open", "closed" ][self.closed], self.name, self.mode, id(self))
def __del__(self):
self.close()
def _check_manager(self):
if not mng.running:
stackless.tasklet(mng.poll)()
mng.running = True
#print "ERROR - Manager not running"
def _check_still_open(self):
if self.closed:
raise ValueError("I/O operation on closed file")
def _ensure_iocp_association(self):
if not self.iocpLinked:
CreateIoCompletionPort(self.handle, mng.handle, NULL, mng.numThreads)
self.iocpLinked = True
def close(self):
"""
close() -> None or (perhaps) an integer. Close the file.
Sets data attribute .closed to True. A closed file canno
further I/O operations. close() may be called more than
error. Some kinds of file objects (for example, opened b
may return an exit status upon closing.
"""
if not self.closed:
CloseHandle(self.handle)
del self.handle
self.closed = True
def open_handle(self):
self.binary = 'b' in self.mode
access = GENERIC_READ
if 'w' in self.mode or ('r' in self.mode and '+' in self.mode):
access |= GENERIC_WRITE
if 'a' in self.mode:
access |= FILE_APPEND_DATA
share = FILE_SHARE_READ | FILE_SHARE_WRITE
if 'w' in self.mode:
disposition = CREATE_ALWAYS
elif 'r' in self.mode and '+' in self.mode:
disposition = OPEN_ALWAYS
else:
disposition = OPEN_EXISTING
flags = FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_OVERLAPPED
if isinstance(self.name, unicode):
func = CreateFileW
else:
func = CreateFileA
self.handle = func(self.name, access,
share, c_void_p(), disposition,
flags, NULL )
if self.handle == INVALID_HANDLE_VALUE:
raise WinError()
self.iocpLinked = False
def read(self, size=None):
"""
read([size]) -> read at most size bytes, returned as a string.
"""
self._check_still_open()
maxBytesToRead = int(os.path.getsize(self.name)) - self.offset
if (size is None) or (maxBytesToRead < size):
size = maxBytesToRead
bytesRead = DWORD()
self.o = OVERLAPPED()
self.o.Offset = self.offset
self.o.taskletID = id(self)
self.buffer = create_string_buffer(size)
self.channel = stackless.channel()
self._ensure_iocp_association()
self._check_manager()
#print self.o.taskletID, "ID on read", self.name
#print "firing ReadFile", self.name
r = ReadFile(self.handle, self.buffer,
size, byref(bytesRead), byref(self.o));
#print "fired ReadFile", self.name
if r == 0:
if GetLastError() != ERROR_IO_PENDING:
pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
0, c_char_p(self.name))
mng.RegisterChannelObject(self.o.taskletID, self.channel)
#print "blocked on channel",self.channel, self.name, self.o.taskletID
self.channel.receive()
#print "returned from channel",self.channel, self.name, self.o.taskletID
self.offset += size
return self.buffer[:size]
def write(self, data):
"""
write(str) -> None. Write string str to file.
"""
self._check_still_open()
bytesToWrite = c_int()
writeBufferPtr = c_char_p()
bytesWritten = DWORD()
self.o = OVERLAPPED()
self.o.Offset = self.offset
self.o.taskletID = id(self)
self.channel = stackless.channel()
self._ensure_iocp_association()
self._check_manager()
#print self.o.taskletID, "ID on write", self.name
fmt = self.binary and "s#" or "t#"
ret = pythonapi.PyArg_ParseTuple(py_object((data,)), c_char_p(fmt),
byref(writeBufferPtr), byref(bytesToWrite))
if ret == 0:
raise WinError()
#print "firing WriteFile", self.name
r = WriteFile(self.handle, writeBufferPtr,
bytesToWrite.value, byref(bytesWritten), byref(self.o))
#print "fired WriteFile", self.name
if r == 0:
if GetLastError() != ERROR_IO_PENDING:
pythonapi.PyErr_SetExcFromWindowsErrWithFilename(py_object(IOError),
0, c_char_p(self.name))
mng.RegisterChannelObject(self.o.taskletID, self.channel)
#print "blocked on channel",self.channel, self.name, self.o.taskletID
written = self.channel.receive()
#print "returned from channel",self.channel, self.name, self.o.taskletID
else:
written = bytesWritten
#print "Checking contents...", bytesToWrite.value, written.value, self.name
if bytesToWrite.value != written.value:
# Check if the quantity of bytes sent has been written to the file
#print self.o.taskletID, "size mismatch"
raise WinError()
def tell(self):
"""
tell() -> current file position, an integer (may be a long integer).
"""
return self.offset
def seek(self, offset, whence=os.SEEK_SET):
"""
seek(offset[, whence]) -> None. Move to new file position.
Argument offset is a byte count. Optional argument whence defaults to
0 (offset from start of file, offset should be >= 0); other values are 1
(move relative to current position, positive or negative), and 2 (move
relative to end of file, usually negative, although many platforms allow
seeking beyond the end of a file). If the file is opened in text mode,
only offsets returned by tell() are legal. Use of other offsets causes
undefined behavior.
Note that not all file objects are seekable.
"""
self._check_still_open()
if whence == os.SEEK_SET:
self.offset = offset
elif whence == os.SEEK_CUR:
self.offset += offset
elif whence == os.SEEK_END:
raise RuntimeError("SEEK_END unimplemented")
def flush(self):
pass
def isatty(self):
"""
isatty() -> true or false. True if the file is connected to a tty device.
"""
self._check_still_open()
return False
def isatty(self):
self._check_still_open()
return False
if __name__ == '__main__':
import time
import glob
import os
stdfile = file
# On your stackless apps, use these 2 lines below
from stacklessfile import stacklessfile as file
open = file
# Function to copy a file
def copyfile(who, infile, out):
st = time.time()
f1 = file(infile, 'rb')
f2 = file(out, 'wb')
print "%s started reading %s ..." % (who, infile)
a = f1.read()
print "%s started writing %s -> %s ..." % (who, infile, out)
f2.write(a)
f1.close()
f2.close()
print "Finished tasklet %s (%s) in %s" % (who, infile, time.time()-st)
# Creating two dummy files
newfile = stdfile('test-small.txt','w')
for x in xrange(10000):
newfile.write(str(x))
newfile.close()
newfile2 = stdfile('test-big.txt','w')
for x in xrange(500000):
newfile2.write(str(x))
newfile2.close()
# Launching tasklets to perform the file copy
for i in xrange(1,11):
stackless.tasklet(copyfile)(i, 'test-big.txt','big%s.txt' % i)
for i in xrange(1,21):
stackless.tasklet(copyfile)(i, 'test-small.txt','sm%s.txt' % i)
st = time.time()
stackless.run()
print "Total time is %s seconds." % (time.time() - st)
# Cleanup all test files used
for f in glob.glob('test*.txt'):
os.unlink(f)
for f in glob.glob('sm*.txt'):
os.unlink(f)
for f in glob.glob('big*.txt'):
os.unlink(f)
Friday, August 17, 2007
Subscribe to:
Post Comments (Atom)
3 comments:
This is excellent work.
did you fix SSL yet?
Who knows where to download XRumer 5.0 Palladium?
Help, please. All recommend this program to effectively advertise on the Internet, this is the best program!
Post a Comment