首页 文章 精选 留言 我的

精选列表

搜索[工具库],共10000篇文章
优秀的个人博客,低调大师

Spring Reactor 项目核心

Reactor Core Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactive Extensions inspired API and efficient event streaming support. Getting it Reactor 3 requires Java 8 or + to run. With Gradle from repo.spring.io or Maven Central repositories (stable releases only): repositories { // maven { url 'http://repo.spring.io/snapshot' } maven { url 'http://repo.spring.io/milestone' } mavenCentral() } dependencies { //compile "io.projectreactor:reactor-core:3.1.4.RELEASE" //testCompile("io.projectreactor:reactor-test:3.1.4.RELEASE") compile "io.projectreactor:reactor-core:3.2.0.M1" testCompile("io.projectreactor:reactor-test:3.2.0.M1") } See the reference documentation for more information on getting it (eg. using Maven, or on how to get milestones and snapshots). Note about Android support: Reactor 3 doesn't officially support nor target Android. However it should work fine with Android SDK 26 (Android O) and above. See thecomplete note in the reference guide. Getting Started New to Reactive Programming or bored of reading already ? Try the Introduction to Reactor Core hands-on ! If you are familiar with RxJava or if you want to check more detailled introduction, be sure to checkhttps://www.infoq.com/articles/reactor-by-example ! Flux A Reactive Streams Publisher with basic flow operators. Static factories on Flux allow for source generation from arbitrary callbacks types. Instance methods allows operational building, materialized on each Flux#subscribe(), Flux#subscribe() or multicasting operations such as Flux#publish and Flux#publishNext. <img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flux.png" width="500"> Flux in action : Flux.fromIterable(getSomeLongList()) .mergeWith(Flux.interval(100)) .doOnNext(serviceA::someObserver) .map(d -> d * 2) .take(3) .onErrorResumeWith(errorHandler::fallback) .doAfterTerminate(serviceM::incrementTerminate) .subscribe(System.out::println); Mono A Reactive Streams Publisher constrained to ZERO or ONE element with appropriate operators. Static factories on Mono allow for deterministic zero or one sequence generation from arbitrary callbacks types. Instance methods allows operational building, materialized on each Mono#subscribe() or Mono#get() eventually called. <img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/mono.png" width="500"> Mono in action : Mono.fromCallable(System::currentTimeMillis) .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time))) .timeout(Duration.ofSeconds(3), errorHandler::fallback) .doOnSuccess(r -> serviceM.incrementSuccess()) .subscribe(System.out::println); Blocking Mono result : Tuple2<Long, Long> nowAndLater = Mono.zip( Mono.just(System.currentTimeMillis()), Flux.just(1).delay(1).map(i -> System.currentTimeMillis())) .block(); Schedulers Reactor uses a Scheduler as a contract for arbitrary task execution. It provides some guarantees required by Reactive Streams flows like FIFO execution. You can use or create efficient schedulers to jump thread on the producing flows (subscribeOn) or receiving flows (publishOn): Mono.fromCallable( () -> System.currentTimeMillis() ) .repeat() .publishOn(Schedulers.single()) .log("foo.bar") .flatMap(time -> Mono.fromCallable(() -> { Thread.sleep(1000); return time; }) .subscribeOn(Schedulers.parallel()) , 8) //maxConcurrency 8 .subscribe(); ParallelFlux ParallelFlux can starve your CPU's from any sequence whose work can be subdivided in concurrent tasks. Turn back into a Flux with ParallelFlux#sequential(), an unordered join or use abitrary merge strategies via 'groups()'. Mono.fromCallable( () -> System.currentTimeMillis() ) .repeat() .parallel(8) //parallelism .runOn(Schedulers.parallel()) .doOnNext( d -> System.out.println("I'm on thread "+Thread.currentThread()) ) .subscribe() Custom sources : Flux.create and FluxSink, Mono.create and MonoSink To bridge a Subscriber or Processor into an outside context that is taking care of producing non concurrently, use Flux#create, Mono#create. Flux.create(sink -> { ActionListener al = e -> { sink.next(textField.getText()); }; // without cancellation support: button.addActionListener(al); // with cancellation support: sink.onCancel(() -> { button.removeListener(al); }); }, // Overflow (backpressure) handling, default is BUFFER FluxSink.OverflowStrategy.LATEST) .timeout(3) .doOnComplete(() -> System.out.println("completed!")) .subscribe(System.out::println) The Backpressure Thing Most of this cool stuff uses bounded ring buffer implementation under the hood to mitigate signal processing difference between producers and consumers. Now, the operators and processors or any standard reactive stream component working on the sequence will be instructed to flow in when these buffers have free room AND only then. This means that we make sure we both have a deterministic capacity model (bounded buffer) and we never block (request more data on write capacity). Yup, it's not rocket science after all, the boring part is already being worked by us in collaboration with Reactive Streams Commons on going research effort. What's more in it ? "Operator Fusion" (flow optimizers), health state observers, helpers to build custom reactive components, bounded queue generator, hash-wheel timer, converters from/to Java 9 Flow, Publisher and Java 8 CompletableFuture. The repository contains a reactor-test project with test features like the StepVerifier. Reference Guide http://projectreactor.io/docs/core/release/reference/docs/index.html Javadoc https://projectreactor.io/docs/core/release/api/ Getting started with Flux and Mono https://github.com/reactor/lite-rx-api-hands-on Reactor By Example https://www.infoq.com/articles/reactor-by-example Head-First Spring & Reactor https://github.com/reactor/head-first-reactive-with-spring-and-reactor/ Beyond Reactor Core Everything to jump outside the JVM with the non-blocking drivers from Reactor Netty. Reactor Addons provide for adapters and extra operators for Reactor 3. Powered by Reactive Streams Commons Licensed under Apache Software License 2.0 Sponsored by Pivotal

优秀的个人博客,低调大师

python标准学习7

使用 os.path 模块处理文件名 import os filename = "my/little/pony" print "using", os.name, "..." print "split", "=>", os.path.split(filename) print "splitext", "=>", os.path.splitext(filename) print "dirname", "=>", os.path.dirname(filename) print "basename", "=>", os.path.basename(filename) print "join", "=>", os.path.join(os.path.dirname(filename), os.path.basename(filename)) using nt ... split => ('my/little', 'pony') splitext => ('my/little/pony', '') dirname => my/little basename => pony join => my/little\pony 当前目录和上一级目录 >>> os.pardir '..' >>> os.curdir '.' 使用 os.path 模块检查文件名的特征 import os FILES = ( os.curdir, "/", "file", "/file", "samples", "samples/sample.jpg", "directory/file", "../directory/file", "/directory/file" ) for file in FILES: print file, "=>", if os.path.exists(file): print "EXISTS", if os.path.isabs(file): print "ISABS", if os.path.isdir(file): print "ISDIR", if os.path.isfile(file): print "ISFILE", if os.path.islink(file): print "ISLINK", if os.path.ismount(file): print "ISMOUNT", print . => EXISTS ISDIR / => EXISTS ISABS ISDIR ISMOUNT file => /file => ISABS samples => EXISTS ISDIR samples/sample.jpg => EXISTS ISFILE directory/file => ../directory/file => /directory/file => ISABS expanduser函数以与大部分Unix shell相同的方式处理用户名快捷符号(~, 不过在 Windows 下工作不正常), 使用 os.path 模块将用户名插入到文件名 import os print os.path.expanduser("~/.pythonrc") # /home/effbot/.pythonrc expandvars函数将文件名中的环境变量替换为对应值 使用 os.path 替换文件名中的环境变量 import os os.environ["USER"] = "user" print os.path.expandvars("/home/$USER/config") print os.path.expandvars("$USER/folders") /home/user/config user/folders 列出目录下所有的文件和目录 >>> a=[file for file in os.listdir("d:\\new")] >>> for i in a: print i walk函数会帮你找出一个目录树下的所有文件. 它的参数依次是目录名, 回调函数, 以及传递给回调函数的数据对象. 使用 os.path 搜索文件系统 import os def callback(arg, directory, files): for file in files: print os.path.join(directory, file), repr(arg) os.path.walk(".", callback, "secret message") ./aifc-example-1.py 'secret message' ./anydbm-example-1.py 'secret message' ./array-example-1.py 'secret message' ... ./samples 'secret message' ./samples/sample.jpg 'secret message' ./samples/sample.txt 'secret message' ./samples/sample.zip 'secret message' ./samples/articles 'secret message' ./samples/articles/article-1.txt 'secret message' ./samples/articles/article-2.txt 'secret message' ... index函数会返回一个文件名列表, 你可以直接使用for-in循环处理文件. 使用 os.listdir 搜索文件系统 import os def index(directory): # like os.listdir, but traverses directory trees stack = [directory] files = [] while stack: directory = stack.pop() for file in os.listdir(directory): fullname = os.path.join(directory, file) files.append(fullname) if os.path.isdir(fullname) and not os.path.islink(fullname): stack.append(fullname) return files for file in index("."): print file .\aifc-example-1.py .\anydbm-example-1.py .\array-example-1.py ... 一次返回一个文件 import os class DirectoryWalker: # a forward iterator that traverses a directory tree def _ _init_ _(self, directory): self.stack = [directory] self.files = [] self.index = 0 def _ _getitem_ _(self, index): while 1: try: file = self.files[self.index] self.index = self.index + 1 except IndexError: # pop next directory from stack self.directory = self.stack.pop() self.files = os.listdir(self.directory) self.index = 0 else: # got a filename fullname = os.path.join(self.directory, file) if os.path.isdir(fullname) and not os.path.islink(fullname): self.stack.append(fullname) return fullname for file in DirectoryWalker("."): print file .\aifc-example-1.py .\anydbm-example-1.py .\array-example-1.py ... 注意DirectoryWalker类并不检查传递给_ _getitem_ _方法的索引值. 这意味着如果你越界访问序列成员(索引数字过大)的话, 这个类将不能正常工作. 下面这个例子它返回文件名和它的os.stat属性(一个元组). 这个版本在每个文件上都能节省一次或两次stat调用(os.path.isdir和os.path.islink内部都使用了stat), 并且在一些平台上运行很快. 使用 DirectoryStatWalker 搜索文件系统 import os, stat class DirectoryStatWalker: # a forward iterator that traverses a directory tree, and # returns the filename and additional file information def _ _init_ _(self, directory): self.stack = [directory] self.files = [] self.index = 0 def _ _getitem_ _(self, index): while 1: try: file = self.files[self.index] self.index = self.index + 1 except IndexError: # pop next directory from stack self.directory = self.stack.pop() self.files = os.listdir(self.directory) self.index = 0 else: # got a filename fullname = os.path.join(self.directory, file) st = os.stat(fullname) mode = st[stat.ST_MODE] if stat.S_ISDIR(mode) and not stat.S_ISLNK(mode): self.stack.append(fullname) return fullname, st for file, st in DirectoryStatWalker("."): print file, st[stat.ST_SIZE] .\aifc-example-1.py 336 .\anydbm-example-1.py 244 .\array-example-1.py 526 Using the stat Module import stat import os, time st = os.stat("samples/sample.txt") print "mode", "=>", oct(stat.S_IMODE(st[stat.ST_MODE])) print "type", "=>", if stat.S_ISDIR(st[stat.ST_MODE]): print "DIRECTORY", if stat.S_ISREG(st[stat.ST_MODE]): print "REGULAR", if stat.S_ISLNK(st[stat.ST_MODE]): print "LINK", print print "size", "=>", st[stat.ST_SIZE] print "last accessed", "=>", time.ctime(st[stat.ST_ATIME]) print "last modified", "=>", time.ctime(st[stat.ST_MTIME]) print "inode changed", "=>", time.ctime(st[stat.ST_CTIME]) mode => 0664 type => REGULAR size => 305 last accessed => Sun Oct 10 22:12:30 1999 last modified => Sun Oct 10 18:39:37 1999 inode changed => Sun Oct 10 15:26:38 1999 使用 string 模块 import string text = "Monty Python's Flying Circus" print "upper", "=>", string.upper(text) print "lower", "=>", string.lower(text) print "split", "=>", string.split(text) print "join", "=>", string.join(string.split(text), "+") print "replace", "=>", string.replace(text, "Python", "Java") print "find", "=>", string.find(text, "Python"), string.find(text, "Java") print "count", "=>", string.count(text, "n") upper => MONTY PYTHON'S FLYING CIRCUS lower => monty python's flying circus split => ['Monty', "Python's", 'Flying', 'Circus'] join => Monty+Python's+Flying+Circus replace => Monty Java's Flying Circus find => 6 -1 count => 3 使用字符串方法替代 string 模块函数 text = "Monty Python's Flying Circus" print "upper", "=>", text.upper() print "lower", "=>", text.lower() print "split", "=>", text.split() print "join", "=>", "+".join(text.split()) print "replace", "=>", text.replace("Python", "Perl") print "find", "=>", text.find("Python"), text.find("Perl") print "count", "=>", text.count("n") upper => MONTY PYTHON'S FLYING CIRCUS lower => monty python's flying circus split => ['Monty', "Python's", 'Flying', 'Circus'] join => Monty+Python's+Flying+Circus replace => Monty Perl's Flying Circus find => 6 -1 count => 3 使用 string 模块将字符串转为数字 import string print int("4711"), print string.atoi("4711"), print string.atoi("11147", 8), # octal 八进制 print string.atoi("1267", 16), # hexadecimal 十六进制 print string.atoi("3mv", 36) # whatever... print string.atoi("4711", 0), print string.atoi("04711", 0), print string.atoi("0x4711", 0) print float("4711"), print string.atof("1"), print string.atof("1.23e5") 4711 4711 4711 4711 4711 4711 2505 18193 4711.0 1.0 123000.0 operator模块为 Python 提供了一个 "功能性" 的标准操作符接口. 当使用map以及filter一类的函数的时候,operator模块中的函数可以替换一些lambda函式. 而且这些函数在一些喜欢写晦涩代码的程序员中很流行. 使用 operator 模块 print "add", "=>", reduce(operator.add, sequence) print "sub", "=>", reduce(operator.sub, sequence) print "mul", "=>", reduce(operator.mul, sequence) print "concat", "=>", operator.concat("spam", "egg") print "repeat", "=>", operator.repeat("spam", 5) print "getitem", "=>", operator.getitem(sequence, 2) print "indexOf", "=>", operator.indexOf(sequence, 2) print "sequenceIncludes", "=>", operator.sequenceIncludes(sequence, 3) add => 7 sub => -5 mul => 8 concat => spamegg repeat => spamspamspamspamspam getitem => 4 indexOf => 1 sequenceIncludes => 0 使用 operator 模块检查类型 import operator import UserList def dump(data): print type(data), "=>", if operator.isCallable(data): print "CALLABLE", if operator.isMappingType(data): print "MAPPING", if operator.isNumberType(data): print "NUMBER", if operator.isSequenceType(data): print "SEQUENCE", print dump(0) dump("string") dump("string"[0]) dump([1, 2, 3]) dump((1, 2, 3)) dump({"a": 1}) dump(len) # function 函数 dump(UserList) # module 模块 dump(UserList.UserList) # class 类 dump(UserList.UserList()) # instance 实例 <type 'int'> => NUMBER <type 'string'> => SEQUENCE <type 'string'> => SEQUENCE <type 'list'> => SEQUENCE <type 'tuple'> => SEQUENCE <type 'dictionary'> => MAPPING <type 'builtin_function_or_method'> => CALLABLE <type 'module'> => <type 'class'> => CALLABLE <type 'instance'> => MAPPING NUMBER SEQUENCE copy模块包含两个函数, 用来拷贝对象 使用 copy 模块复制对象 import copy a = [[1],[2],[3]] b = copy.copy(a) print "before", "=>" print a print b # modify original a[0][0] = 0 a[1] = None print "after", "=>" print a print b before => [[1], [2], [3]] [[1], [2], [3]] after => [[0], None, [3]] [[0], [2], [3]] 使用 copy 模块复制集合(Collections) import copy a = [[1],[2],[3]] b = copy.deepcopy(a) print "before", "=>" print a print b # modify original a[0][0] = 0 a[1] = None print "after", "=>" print a print b before => [[1], [2], [3]] [[1], [2], [3]] after => [[0], None, [3]] [[1], [2], [3]] 使用sys模块获得脚本的参数 import sys print "script name is", sys.argv[0] if len(sys.argv) > 1: print "there are", len(sys.argv)-1, "arguments:" for arg in sys.argv[1:]: print arg else: print "there are no arguments!" script name is sys-argv-example-1.py there are no arguments! 使用sys模块操作模块搜索路径 import sys print "path has", len(sys.path), "members" # add the sample directory to the path sys.path.insert(0, "samples") import sample # nuke the path sys.path = [] import random # oops! path has 7 members this is the sample module! Traceback (innermost last): File "sys-path-example-1.py", line 11, in ? import random # oops! ImportError: No module named random 使用sys模块查找内建模块 import sys def dump(module): print module, "=>", if module in sys.builtin_module_names: print "<BUILTIN>" else: module = _ _import_ _(module) print module._ _file_ _ dump("os") dump("sys") dump("string") dump("strop") dump("zlib") os => C:\python\lib\os.pyc sys => <BUILTIN> string => C:\python\lib\string.pyc strop => <BUILTIN> zlib => C:\python\zlib.pyd 使用sys模块查找已导入的模块 modules字典包含所有加载的模块.import语句在从磁盘导入内容之前会先检查这个字典. import sys print sys.modules.keys() ['os.path', 'os', 'exceptions', '_ _main_ _', 'ntpath', 'strop', 'nt', 'sys', '_ _builtin_ _', 'site', 'signal', 'UserDict', 'string', 'stat'] getrefcount函数返回给定对象的引用记数 - 也就是这个对象使用次数. Python 会跟踪这个值, 当它减少为0的时候, 就销毁这个对象. 使用sys模块获得引用记数 import sys variable = 1234 print sys.getrefcount(0) print sys.getrefcount(variable) print sys.getrefcount(None) 50 3 192 注意这个值总是比实际的数量大, 因为该函数本身在确定这个值的时候依赖这个对象 使用sys模块获得当前平台 import sys # # emulate "import os.path" (sort of)... if sys.platform == "win32": import ntpath pathmodule = ntpath elif sys.platform == "mac": import macpath pathmodule = macpath else: # assume it's a posix platform import posixpath pathmodule = posixpath print pathmodule setprofiler函数允许你配置一个分析函数(profiling function). 这个函数会在每次调用某个函数或方法时被调用(明确或隐含的), 或是遇到异常的时候被调用. 使用sys模块配置分析函数 import sys def test(n): j = 0 for i in range(n): j = j + i return n def profiler(frame, event, arg): print event, frame.f_code.co_name, frame.f_lineno, "->", arg # profiler is activated on the next call, return, or exception # 分析函数将在下次函数调用, 返回, 或异常时激活 sys.setprofile(profiler) # profile this function call # 分析这次函数调用 test(1) # disable profiler # 禁用分析函数 sys.setprofile(None) # don't profile this call # 不会分析这次函数调用 test(2) call test 3 -> None return test 7 -> 1 使用sys模块配置单步跟踪函数 import sys def test(n): j = 0 for i in range(n): j = j + i return n def tracer(frame, event, arg): print event, frame.f_code.co_name, frame.f_lineno, "->", arg return tracer # tracer is activated on the next call, return, or exception # 跟踪器将在下次函数调用, 返回, 或异常时激活 sys.settrace(tracer) # trace this function call # 跟踪这次函数调用 test(1) # disable tracing # 禁用跟踪器 sys.settrace(None) # don't trace this call # 不会跟踪这次函数调用 test(2) call test 3 -> None line test 3 -> None line test 4 -> None line test 5 -> None line test 5 -> None line test 6 -> None line test 5 -> None line test 7 -> None return test 7 -> 1 ============================================================================== 本文转自被遗忘的博客园博客,原文链接:http://www.cnblogs.com/rollenholt/archive/2011/11/26/2264682.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

python标准学习9

fileinput模块允许你循环一个或多个文本文件的内容 使用 fileinput 模块循环一个文本文件 import fileinput import sys for line in fileinput. input ( "samples/sample.txt" ): sys.stdout.write( "-> " ) sys.stdout.write(line) - > We will perhaps eventually be writing only small - > modules which are identified by name as they are - > used to build larger ones, so that devices like - > indentation, rather than delimiters, might become - > feasible for expressing local structure in the - > source language. - > - - Donald E. Knuth, December 1974 你也可以使用fileinput模块获得当前行的元信息 (meta information). 其中包括isfirstline,filename,lineno 使用 fileinput 模块处理多个文本文件 import fileinput import glob import string, sys for line in fileinput. input (glob.glob( "samples/*.txt" )): if fileinput.isfirstline(): # first in a file? sys.stderr.write( "-- reading %s --\n" % fileinput.filename()) sys.stdout.write( str (fileinput.lineno()) + " " + string.upper(line)) - - reading samples\sample.txt - - 1 WE WILL PERHAPS EVENTUALLY BE WRITING ONLY SMALL 2 MODULES WHICH ARE IDENTIFIED BY NAME AS THEY ARE 3 USED TO BUILD LARGER ONES, SO THAT DEVICES LIKE 4 INDENTATION, RATHER THAN DELIMITERS, MIGHT BECOME 5 FEASIBLE FOR EXPRESSING LOCAL STRUCTURE IN THE 6 SOURCE LANGUAGE. 7 - - DONALD E. KNUTH, DECEMBER 1974 文本文件的替换操作很简单. 只需要把inplace关键字参数设置为 1 , 传递给input函数, 该模块会帮你做好一切. 使用 fileinput 模块将 CRLF 改为 LF import fileinput, sys for line in fileinput. input (inplace = 1 ): # convert Windows/DOS text files to Unix files if line[ - 2 :] = = "\r\n" : line = line[: - 2 ] + "\n" sys.stdout.write(line) shutil实用模块包含了一些用于复制文件和文件夹的函数. 使用 shutil 复制文件 import shutil import os for file in os.listdir( "." ): if os.path.splitext( file )[ 1 ] = = ".py" : print file shutil.copy( file , os.path.join( "backup" , file )) aifc - example - 1.py anydbm - example - 1.py array - example - 1.py ... copytree函数用于复制整个目录树 (与cp -r相同), 而rmtree函数用于删除整个目录树 (与rm -r) 使用 shutil 模块复制/删除目录树 import shutil import os SOURCE = "samples" BACKUP = "samples-bak" # create a backup directory shutil.copytree(SOURCE, BACKUP) print os.listdir(BACKUP) # remove it shutil.rmtree(BACKUP) print os.listdir(BACKUP) [ 'sample.wav' , 'sample.jpg' , 'sample.au' , 'sample.msg' , 'sample.tgz' , ... Traceback (most recent call last): File "shutil-example-2.py" , line 17 , in ? print os.listdir(BACKUP) os.error: No such file or directory tempfile模块允许你快速地创建名称唯一的临时文件供使用. 使用 tempfile 模块创建临时文件 import tempfile import os tempfile = tempfile.mktemp() print "tempfile" , "=>" , tempfile file = open (tempfile, "w+b" ) file .write( "*" * 1000 ) file .seek( 0 ) print len ( file .read()), "bytes" file .close() try : # must remove file when done os.remove(tempfile) except OSError: pass tempfile = > C:\TEMP\~ 160 - 1 1000 bytes TemporaryFile函数会自动挑选合适的文件名, 并打开文件而且它会确保该文件在关闭的时候会被删除. (在 Unix 下, 你可以删除一个已打开的文件, 这 时文件关闭时它会被自动删除. 在其他平台上, 这通过一个特殊的封装类实现.) 使用 tempfile 模块打开临时文件 import tempfile file = tempfile.TemporaryFile() for i in range ( 100 ): file .write( "*" * 100 ) file .close() # removes the file! StringIO模块的使用. 它实现了一个工作在内存的文件对象 (内存文件). 在大多需要标准文件对象的地方都可以使用它来替换. 使用 StringIO 模块从内存文件读入内容 import StringIO MESSAGE = "That man is depriving a village somewhere of a computer scientist." file = StringIO.StringIO(MESSAGE) print file .read() That man is depriving a village somewhere of a computer scientist. StringIO类实现了内建文件对象的所有方法, 此外还有getvalue方法用来返回它内部的字符串值 使用 StringIO 模块向内存文件写入内容 import StringIO file = StringIO.StringIO() file .write( "This man is no ordinary man. " ) file .write( "This is Mr. F. G. Superman." ) print file .getvalue() This man is no ordinary man. This is Mr. F. G. Superman. 使用 StringIO 模块捕获输出 import StringIO import string, sys stdout = sys.stdout sys.stdout = file = StringIO.StringIO() print """ According to Gbaya folktales, trickery and guile are the best ways to defeat the python, king of snakes, which was hatched from a dragon at the world's start. -- National Geographic, May 1997 """ sys.stdout = stdout print string.upper( file .getvalue()) ACCORDING TO GBAYA FOLKTALES, TRICKERY AND GUILE ARE THE BEST WAYS TO DEFEAT THE PYTHON, KING OF SNAKES, WHICH WAS HATCHED FROM A DRAGON AT THE WORLD'S START. - - NATIONAL GEOGRAPHIC, MAY 1997 cStringIO是一个可选的模块, 是StringIO的更快速实现. 它的工作方式和StringIO基本相同, 但是它不可以被继承 使用 cStringIO 模块 import cStringIO MESSAGE = "That man is depriving a village somewhere of a computer scientist." file = cStringIO.StringIO(MESSAGE) print file .read() That man is depriving a village somewhere of a computer scientist. 为了让你的代码尽可能快, 但同时保证兼容低版本的 Python ,你可以使用一个小技巧在cStringIO不可用时启用StringIO模块, 后退至 StringIO try : import cStringIO StringIO = cStringIO except ImportError: import StringIO print StringIO <module 'StringIO' (built - in )> mmap模块提供了操作系统内存映射函数的接口,映射区域的行为和字符串对象类似, 但数据是直接从文件读取的. 使用 mmap 模块 import mmap import os filename = "samples/sample.txt" file = open (filename, "r+" ) size = os.path.getsize(filename) data = mmap.mmap( file .fileno(), size) # basics print data print len (data), size # use slicing to read from the file # 使用切片操作读取文件 print repr (data[: 10 ]), repr (data[: 10 ]) # or use the standard file interface # 或使用标准的文件接口 print repr (data.read( 10 )), repr (data.read( 10 )) <mmap object at 008A2A10 > 302 302 'We will pe' 'We will pe' 'We will pe' 'rhaps even' 在 Windows 下, 这个文件必须以既可读又可写的模式打开( `r+` , `w+` , 或 `a+` ), 否则mmap调用会失败. 对映射区域使用字符串方法和正则表达式 mport mmap import os, string, re def mapfile(filename): file = open (filename, "r+" ) size = os.path.getsize(filename) return mmap.mmap( file .fileno(), size) data = mapfile( "samples/sample.txt" ) # search index = data.find( "small" ) print index, repr (data[index - 5 :index + 15 ]) # regular expressions work too! m = re.search( "small" , data) print m.start(), m.group() 43 'only small\015\012modules ' 43 small ============================================================================== 本文转自被遗忘的博客园博客,原文链接:http://www.cnblogs.com/rollenholt/archive/2011/11/27/2264946.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

python标准学习4

>>> os.environ["HOME"] 'C:\\Users\\Administrator' >>> os.getcwd() #获得当前的目录 'D:\\new' >>> os.getenv("QTDIR") #获取环境变量的值 'D:\\vs2010-qt-src-4.7.4\\qt-src-4.7.4' os.putenv(varname, value) #设置环境变量的值 os.mkdir(path[, mode]) >>> os.mkdir("aa") >>> os.rmdir("aa") >>>os.makedirs("aa\\bb\\cc") 多级目录 os.removedirs(path)¶ os.remove("d:\\new\\hello.txt") #删除文件,如果是目录的话,出错 os.rename("test.txt","a.txt") random.randint(a, b) Return a random integer N such that a <= N <= b. random.choice(seq) Return a random element from the non-empty sequence seq. If seq is empty, raises IndexError. random.random() Return the next random floating point number in the range [0.0, 1.0). random.shuffle(x[, random]) 随机排序序列 random.uniform(a, b)¶返回a<=N<=b之间的浮点数 random.randrange([start], stop[, step])想当于choice(range(start, stop, step)) >>> random.random() # Random float x, 0.0 <= x < 1.0 0.37444887175646646 >>> random.uniform(1, 10) # Random float x, 1.0 <= x < 10.0 1.1800146073117523 >>> random.randint(1, 10) # Integer from 1 to 10, endpoints included 7 >>> random.randrange(0, 101, 2) # Even integer from 0 to 100 26 >>> random.choice('abcdefghij') # Choose a random element 'c' >>> items = [1, 2, 3, 4, 5, 6, 7] >>> random.shuffle(items) >>> items [7, 3, 2, 5, 6, 4, 1] >>> random.sample([1, 2, 3, 4, 5], 3) # Choose 3 elements [4, 1, 5] >>> datetime.MAXYEAR 9999 >>> datetime.MINYEAR 1 >>> a=datetime.date(2011,2,1) >>> a.today() datetime.date(2011, 11, 26) >>> a.year 2011 >>> a.month 2 >>> a.day 1 >>> import time >>> from datetime import date >>> today = date.today() >>> today datetime.date(2007, 12, 5) >>> my_birthday = date(today.year, 6, 24) >>> if my_birthday < today: ... my_birthday = my_birthday.replace(year=today.year + 1) >>> my_birthday datetime.date(2008, 6, 24) >>> time_to_birthday = abs(my_birthday - today) #计算日期之差 >>> time_to_birthday.days 202 >>> datetime.now() #当前时间 datetime.datetime(2011, 11, 26, 10, 40, 10, 283000) >>> datetime.utcnow() datetime.datetime(2011, 11, 26, 2, 40, 34, 809000) >>> a=date(2005,7,14) #日期和时间进行合并 >>> t=time(12,30,12) >>> datetime.combine(a,t) datetime.datetime(2005, 7, 14, 12, 30, 12) >>> dt = datetime.strptime("21/11/06 16:30", "%d/%m/%y %H:%M") >>> dt datetime.datetime(2006, 11, 21, 16, 30) >>> from datetime import timedelta, datetime, tzinfo >>> class GMT1(tzinfo): ... def __init__(self): # DST starts last Sunday in March ... d = datetime(dt.year, 4, 1) # ends last Sunday in October ... self.dston = d - timedelta(days=d.weekday() + 1) ... d = datetime(dt.year, 11, 1) ... self.dstoff = d - timedelta(days=d.weekday() + 1) ... def utcoffset(self, dt): ... return timedelta(hours=1) + self.dst(dt) ... def dst(self, dt): ... if self.dston <= dt.replace(tzinfo=None) < self.dstoff: ... return timedelta(hours=1) ... else: ... return timedelta(0) ... def tzname(self,dt): ... return "GMT +1" ... >>> class GMT2(tzinfo): ... def __init__(self): ... d = datetime(dt.year, 4, 1) ... self.dston = d - timedelta(days=d.weekday() + 1) ... d = datetime(dt.year, 11, 1) ... self.dstoff = d - timedelta(days=d.weekday() + 1) ... def utcoffset(self, dt): ... return timedelta(hours=1) + self.dst(dt) ... def dst(self, dt): ... if self.dston <= dt.replace(tzinfo=None) < self.dstoff: ... return timedelta(hours=2) ... else: ... return timedelta(0) ... def tzname(self,dt): ... return "GMT +2" ... >>> gmt1 = GMT1() >>> # Daylight Saving Time >>> dt1 = datetime(2006, 11, 21, 16, 30, tzinfo=gmt1) >>> dt1.dst() datetime.timedelta(0) >>> dt1.utcoffset() datetime.timedelta(0, 3600) >>> dt2 = datetime(2006, 6, 14, 13, 0, tzinfo=gmt1) >>> dt2.dst() datetime.timedelta(0, 3600) >>> dt2.utcoffset() datetime.timedelta(0, 7200) >>> # Convert datetime to another time zone >>> dt3 = dt2.astimezone(GMT2()) >>> dt3 # doctest: +ELLIPSIS datetime.datetime(2006, 6, 14, 14, 0, tzinfo=<GMT2 object at 0x...>) >>> dt2 # doctest: +ELLIPSIS datetime.datetime(2006, 6, 14, 13, 0, tzinfo=<GMT1 object at 0x...>) >>> dt2.utctimetuple() == dt3.utctimetuple() True class datetime.time(hour[, minute[, second[, microsecond[, tzinfo]]]]) >>> a=time(10,46,12) >>> a.min datetime.time(0, 0) >>> a.max datetime.time(23, 59, 59, 999999) >>> a.hour 10 >>> a.minute 46 >>> a.second 12 >>> a.microsecond 0 class collections.Counter([iterable-or-mapping]) A Counter is a dict subclass for counting hashable objects. >>> # Tally occurrences of words in a list >>> cnt = Counter() >>> for word in ['red', 'blue', 'red', 'green', 'blue', 'blue']: ... cnt[word] += 1 >>> cnt Counter({'blue': 3, 'red': 2, 'green': 1}) >>> # Find the ten most common words in Hamlet >>> import re >>> words = re.findall('\w+', open('hamlet.txt').read().lower()) >>> Counter(words).most_common(10) [('the', 1143), ('and', 966), ('to', 762), ('of', 669), ('i', 631), ('you', 554), ('a', 546), ('my', 514), ('hamlet', 471), ('in', 451)] >>> c = Counter(['eggs', 'ham']) >>> c['bacon'] # count of a missing element is zero 0 >>> c['sausage'] = 0 # counter entry with a zero count >>> del c['sausage'] # del actually removes the entry >>> c = Counter(a=4, b=2, c=0, d=-2) >>> list(c.elements()) ['a', 'a', 'a', 'a', 'b', 'b'] most_common([n]) #出现次数最多的n个 >>> Counter('abracadabra').most_common(3) [('a', 5), ('r', 2), ('b', 2)] >>> c = Counter(a=4, b=2, c=0, d=-2) >>> d = Counter(a=1, b=2, c=3, d=4) >>> c.subtract(d) Counter({'a': 3, 'b': 0, 'c': -3, 'd': -6}) >>> c = Counter(a=4, b=2, c=0, d=-2) >>> sum(c.values()) # total of all counts 4 >>> list(c) ['a', 'c', 'b', 'd'] >>> set(c) set(['a', 'c', 'b', 'd']) >>> dict(c) {'a': 4, 'c': 0, 'b': 2, 'd': -2} >>> c.items() [('a', 4), ('c', 0), ('b', 2), ('d', -2)] >>> c.most_common()[:-2:-1] # c.most_common()[:-n:-1] n least #common elements [('d', -2)] >>> c+=Counter() >>> c Counter({'a': 4, 'b': 2}) >>> c.clear() >>> c Counter() >>> c = Counter(a=3, b=1) >>> d = Counter(a=1, b=2) >>> c + d # add two counters together: c[x] + d[x] Counter({'a': 4, 'b': 3}) >>> c - d # subtract (keeping only positive counts) Counter({'a': 2}) >>> c & d # intersection: min(c[x], d[x]) Counter({'a': 1, 'b': 1}) >>> c | d # union: max(c[x], d[x]) Counter({'a': 3, 'b': 2}) >>> from collections import deque >>> d = deque('ghi') # make a new deque with three items >>> for elem in d: # iterate over the deque's elements ... print elem.upper() G H I >>> d.append('j') # add a new entry to the right side >>> d.appendleft('f') # add a new entry to the left side >>> d # show the representation of the deque deque(['f', 'g', 'h', 'i', 'j']) >>> d.pop() # return and remove the rightmost item 'j' >>> d.popleft() # return and remove the leftmost item 'f' >>> list(d) # list the contents of the deque ['g', 'h', 'i'] >>> d[0] # peek at leftmost item 'g' >>> d[-1] # peek at rightmost item 'i' >>> list(reversed(d)) # list the contents of a deque in reverse ['i', 'h', 'g'] >>> 'h' in d # search the deque True >>> d.extend('jkl') # add multiple elements at once >>> d deque(['g', 'h', 'i', 'j', 'k', 'l']) >>> d.rotate(1) # right rotation >>> d deque(['l', 'g', 'h', 'i', 'j', 'k']) >>> d.rotate(-1) # left rotation >>> d deque(['g', 'h', 'i', 'j', 'k', 'l']) >>> deque(reversed(d)) # make a new deque in reverse order deque(['l', 'k', 'j', 'i', 'h', 'g']) >>> d.clear() # empty the deque >>> d.pop() # cannot pop from an empty deque Traceback (most recent call last): File "<pyshell#6>", line 1, in -toplevel- d.pop() IndexError: pop from an empty deque >>> d.extendleft('abc') # extendleft() reverses the input order >>> d deque(['c', 'b', 'a']) def tail(filename, n=10): 'Return the last n lines of a file' return deque(open(filename), n) def moving_average(iterable, n=3): # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0 # http://en.wikipedia.org/wiki/Moving_average it = iter(iterable) d = deque(itertools.islice(it, n-1)) d.appendleft(0) s = sum(d) for elem in it: s += elem - d.popleft() d.append(elem) yield s / float(n) def delete_nth(d, n): d.rotate(-n) d.popleft() d.rotate(n) class collections.defaultdict([default_factory[, ...]]) >>> s = [('yellow', 1), ('blue', 2), ('yellow', 3), ('blue', 4), ('red', 1)] >>> d = defaultdict(list) >>> for k, v in s: ... d[k].append(v) ... >>> d.items() [('blue', [2, 4]), ('red', [1]), ('yellow', [1, 3])] >>> d = {} >>> for k, v in s: ... d.setdefault(k, []).append(v) ... >>> d.items() [('blue', [2, 4]), ('red', [1]), ('yellow', [1, 3])] >>> s = 'mississippi' >>> d = defaultdict(int) >>> for k in s: ... d[k] += 1 ... >>> d.items() [('i', 4), ('p', 2), ('s', 4), ('m', 1)] >>> s = [('red', 1), ('blue', 2), ('red', 3), ('blue', 4), ('red', 1), ('blue', 4)] >>> d = defaultdict(set) >>> for k, v in s: ... d[k].add(v) ... >>> d.items() [('blue', set([2, 4])), ('red', set([1, 3]))] >>> def heapsort(iterable): ... 'Equivalent to sorted(iterable)' ... h = [] ... for value in iterable: ... heappush(h, value) ... return [heappop(h) for i in range(len(h))] ... >>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0]) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> h = [] >>> heappush(h, (5, 'write code')) >>> heappush(h, (7, 'release product')) >>> heappush(h, (1, 'write spec')) >>> heappush(h, (3, 'create tests')) >>> heappop(h) (1, 'write spec') #coding=utf-8 #堆的实例 from heapq import heappush, heappop, heappushpop, heapify, heapreplace, nlargest,\ nsmallest heap=[] heappush(heap,"A"); heappush(heap,"C"); heappush(heap,"B"); print heap heappop(heap) #弹出堆中最小的元素 print heap var=heappushpop(heap,"D") #返回并弹出堆中最小的元素,并且将D压入堆 print var print heap var=heapreplace(heap,"E") #返回并弹出堆中最小的元素,并且将D压入堆, print var print heap list=[1,2,3,4,5,6,7,8,9,0] heapify(list); print list print nlargest(3,list) #返回堆中最大的3个 print nsmallest(3,list) #返回堆中最小的3个 ============================================================================== 本文转自被遗忘的博客园博客,原文链接:http://www.cnblogs.com/rollenholt/archive/2011/11/26/2264225.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

python标准学习6

使用 apply 函数 def function(a, b): print a, b apply (function, ( "whither" , "canada?" )) apply (function, ( 1 , 2 + 3 )) whither canada? 1 5 使用 apply 函数传递关键字参数 def function(a, b): print a, b apply (function, ( "crunchy" , "frog" )) apply (function, ( "crunchy" ,), { "b" : "frog" }) apply (function, (), { "a" : "crunchy" , "b" : "frog" }) crunchy frog crunchy frog crunchy frog 使用 apply 函数调用基类的构造函数 class Rectangle: def _ _init_ _( self , color = "white" , width = 10 , height = 10 ): print "create a" , color, self , "sized" , width, "x" , height class RoundedRectangle(Rectangle): def _ _init_ _( self , * * kw): apply (Rectangle._ _init_ _, ( self ,), kw) rect = Rectangle(color = "green" , height = 100 , width = 100 ) rect = RoundedRectangle(color = "blue" , height = 20 ) create a green <Rectangle instance at 8c8260 > sized 100 x 100 create a blue <RoundedRectangle instance at 8c84c0 > sized 10 x 20 使用 _ _import_ _ 函数获得特定函数 def getfunctionbyname(module_name, function_name): module = _ _import_ _(module_name) return getattr (module, function_name) print repr (getfunctionbyname( "dumbdbm" , "open" )) <function open at 794fa0 > 使用 _ _import_ _ 函数实现 延迟导入 class LazyImport: def _ _init_ _( self , module_name): self .module_name = module_name self .module = None def _ _getattr_ _( self , name): if self .module is None : self .module = _ _import_ _( self .module_name) return getattr ( self .module, name) string = LazyImport( "string" ) print string.lowercase abcdefghijklmnopqrstuvwxyz 使用 dir 函数 def dump(value): print value, "=>" , dir (value) import sys dump( 0 ) dump( 1.0 ) dump( 0.0j ) # complex number dump([]) # list dump({}) # dictionary dump( "string" ) dump( len ) # function dump(sys) # module 0 = > [] 1.0 = > [] 0j = > [ 'conjugate' , 'imag' , 'real' ] [] = > [ 'append' , 'count' , 'extend' , 'index' , 'insert' , 'pop' , 'remove' , 'reverse' , 'sort' ] {} = > [ 'clear' , 'copy' , 'get' , 'has_key' , 'items' , 'keys' , 'update' , 'values' ] string = > [] <built - in function len > = > [ '_ _doc_ _' , '_ _name_ _' , '_ _self_ _' ] <module 'sys' (built - in )> = > [ '_ _doc_ _' , '_ _name_ _' , '_ _stderr_ _' , '_ _stdin_ _' , '_ _stdout_ _' , 'argv' , 'builtin_module_names' , 'copyright' , 'dllhandle' , 'exc_info' , 'exc_type' , 'exec_prefix' , 'executable' , ... 使用 dir 函数查找类的所有成员 class A: def a( self ): pass def b( self ): pass class B(A): def c( self ): pass def d( self ): pass def getmembers(klass, members = None ): # get a list of all class members, ordered by class if members is None : members = [] for k in klass._ _bases_ _: getmembers(k, members) for m in dir (klass): if m not in members: members.append(m) return members print getmembers(A) print getmembers(B) print getmembers(IOError) [ '_ _doc_ _' , '_ _module_ _' , 'a' , 'b' ] [ '_ _doc_ _' , '_ _module_ _' , 'a' , 'b' , 'c' , 'd' ] [ '_ _doc_ _' , '_ _getitem_ _' , '_ _init_ _' , '_ _module_ _' , '_ _str_ _' ] 使用 callable 函数 def dump(function): if callable (function): print function, "is callable" else : print function, "is *not* callable" class A: def method( self , value): return value class B(A): def _ _call_ _( self , value): return value a = A() b = B() dump( 0 ) # simple objects dump( "string" ) dump( callable ) dump(dump) # function dump(A) # classes dump(B) dump(B.method) dump(a) # instances dump(b) dump(b.method) 0 is * not * callable string is * not * callable <built - in function callable > is callable <function dump at 8ca320 > is callable A is callable B is callable <unbound method A.method> is callable <A instance at 8caa10 > is * not * callable <B instance at 8cab00 > is callable <method A.method of B instance at 8cab00 > is callable 使用 eval 函数 def dump(expression): result = eval (expression) print expression, "=>" , result, type (result) dump( "1" ) dump( "1.0" ) dump( "'string'" ) dump( "1.0 + 2.0" ) dump( "'*' * 10" ) dump( "len('world')" ) 1 = > 1 < type 'int' > 1.0 = > 1.0 < type 'float' > 'string' = > string < type 'string' > 1.0 + 2.0 = > 3.0 < type 'float' > '*' * 10 = > * * * * * * * * * * < type 'string' > len ( 'world' ) = > 5 < type 'int' > 使用 eval 函数执行任意命令 print eval ( "_ _import_ _('os').getcwd()" ) print eval ( "_ _import_ _('os').remove('file')" ) / home / fredrik / librarybook Traceback (innermost last): File "builtin-eval-example-2" , line 2 , in ? File "<string>" , line 0 , in ? os.error: ( 2 , 'No such file or directory' ) 使用 compile 函数检查语法 NAME = "script.py" BODY = """ prnt 'owl-stretching time' """ try : compile (BODY, NAME, "exec" ) except SyntaxError, v: print "syntax error:" , v, "in" , NAME # syntax error: invalid syntax in script.py 执行已编译的代码 BODY = """ print 'the ant, an introduction' """ code = compile (BODY, "<script>" , "exec" ) print code exec code <code object ? at 8c6be0 , file "<script>" , line 0 > the ant, an introduction Python 还提供了execfile函数, 一个从文件加载代码, 编译代码, 执行代码的快捷方式. 使用 execfile 函数 execfile ( "hello.py" ) def EXECFILE (filename, locals = None , globals = None ): exec compile ( open (filename).read(), filename, "exec" ) in locals , globals EXECFILE ( "hello.py" ) hello again, and welcome to the show hello again, and welcome to the show 显式地访问 _ _builtin_ _ 模块中的函数 def open (filename, mode = "rb" ): import _ _builtin_ _ file = _ _builtin_ _. open (filename, mode) if file .read( 5 ) not in ( "GIF87" , "GIF89" ): raise IOError, "not a GIF file" file .seek( 0 ) return file fp = open ( "samples/sample.gif" ) print len (fp.read()), "bytes" fp = open ( "samples/sample.jpg" ) print len (fp.read()), "bytes" 3565 bytes Traceback (innermost last): File "builtin-open-example-1.py" , line 12 , in ? File "builtin-open-example-1.py" , line 5 , in open IOError: not a GIF file 使用 exceptions 模块 # python imports this module by itself, so the following # line isn't really needed # python 会自动导入该模块, 所以以下这行是不必要的 # import exceptions class HTTPError(Exception): # indicates an HTTP protocol error def _ _init_ _( self , url, errcode, errmsg): self .url = url self .errcode = errcode self .errmsg = errmsg def _ _str_ _( self ): return ( "<HTTPError for %s: %s %s>" % ( self .url, self .errcode, self .errmsg) ) try : raise HTTPError( "http://www.python.org/foo" , 200 , "Not Found" ) except HTTPError, error: print "url" , "=>" , error.url print "errcode" , "=>" , error.errcode print "errmsg" , "=>" , error.errmsg raise # reraise exception url = > http: / / www.python.org / foo errcode = > 200 errmsg = > Not Found Traceback (innermost last): File "exceptions-example-1" , line 16 , in ? HTTPError: <HTTPError for http: / / www.python.org / foo: 200 Not Found> 使用 os 模块重命名和删除文件 import os import string def replace( file , search_for, replace_with): # replace strings in a text file back = os.path.splitext( file )[ 0 ] + ".bak" temp = os.path.splitext( file )[ 0 ] + ".tmp" try : # remove old temp file, if any os.remove(temp) except os.error: pass fi = open ( file ) fo = open (temp, "w" ) for s in fi.readlines(): fo.write(string.replace(s, search_for, replace_with)) fi.close() fo.close() try : # remove old backup file, if any os.remove(back) except os.error: pass # rename original to backup... os.rename( file , back) # ...and temporary to original os.rename(temp, file ) # # try it out! file = "samples/sample.txt" replace( file , "hello" , "tjena" ) replace( file , "tjena" , "hello" ) 使用 os 列出目录下的文件 import os for file in os.listdir( "samples" ): print file sample.au sample.jpg sample.wav ... getcwd和chdir函数分别用于获得和改变当前工作目录 使用 os 模块改变当前工作目录 import os # where are we? cwd = os.getcwd() print "1" , cwd # go down os.chdir( "samples" ) print "2" , os.getcwd() # go back up os.chdir(os.pardir) print "3" , os.getcwd() 1 / ematter / librarybook 2 / ematter / librarybook / samples 3 / ematter / librarybook makedirs和removedirs函数用于创建或删除目录层 使用 os 模块创建/删除多个目录级 import os os.makedirs( "test/multiple/levels" ) fp = open ( "test/multiple/levels/file" , "w" ) fp.write( "inspector praline" ) fp.close() # remove the file os.remove( "test/multiple/levels/file" ) # and all empty directories above it os.removedirs( "test/multiple/levels" ) removedirs函数会删除所给路径中最后一个目录下所有的空目录. 而mkdir和rmdir函数只能处理单个目录级 使用 os 模块创建/删除目录 import os os.mkdir( "test" ) os.rmdir( "test" ) os.rmdir( "samples" ) # this will fail Traceback (innermost last): File "os-example-7" , line 6 , in ? OSError: [Errno 41 ] Directory not empty: 'samples' 如果需要删除非空目录, 你可以使用shutil模块中的rmtree函数 >>> import shutil shutil.rmtree( "d:\\a" ) 复制文件目录(包括内部文件) >>> shutil.copytree( "d:\\new" , "d:\\a" ) 复制文件操作: shutil.copyfile( "d:\\new\\a.txt" , "d:\\a.txt" ) 目录或文件的移动操作 shutil.move( "d:\\new\\a.txt" , "d:\\" ) 使用 os 模块获取文件属性 import os import time file = "samples/sample.jpg" def dump(st): mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime = st print "- size:" , size, "bytes" print "- owner:" , uid, gid print "- created:" , time.ctime(ctime) print "- last accessed:" , time.ctime(atime) print "- last modified:" , time.ctime(mtime) print "- mode:" , oct (mode) print "- inode/dev:" , ino, dev # # get stats for a filename st = os.stat( file ) print "stat" , file dump(st) print # # get stats for an open file fp = open ( file ) st = os.fstat(fp.fileno()) print "fstat" , file dump(st) stat samples / sample.jpg - size: 4762 bytes - owner: 0 0 - created: Tue Sep 07 22 : 45 : 58 1999 - last accessed: Sun Sep 19 00 : 00 : 00 1999 - last modified: Sun May 19 01 : 42 : 16 1996 - mode: 0100666 - inode / dev: 0 2 fstat samples / sample.jpg - size: 4762 bytes - owner: 0 0 - created: Tue Sep 07 22 : 45 : 58 1999 - last accessed: Sun Sep 19 00 : 00 : 00 1999 - last modified: Sun May 19 01 : 42 : 16 1996 - mode: 0100666 - inode / dev: 0 0 可以使用chmod和utime函数修改文件的权限模式和时间属性 使用 os 模块修改文件的权限和时间戳 import os import stat, time infile = "samples/sample.jpg" outfile = "out.jpg" # copy contents fi = open (infile, "rb" ) fo = open (outfile, "wb" ) while 1 : s = fi.read( 10000 ) if not s: break fo.write(s) fi.close() fo.close() # copy mode and timestamp st = os.stat(infile) os.chmod(outfile, stat.S_IMODE(st[stat.ST_MODE])) os.utime(outfile, (st[stat.ST_ATIME], st[stat.ST_MTIME])) print "original" , "=>" print "mode" , oct (stat.S_IMODE(st[stat.ST_MODE])) print "atime" , time.ctime(st[stat.ST_ATIME]) print "mtime" , time.ctime(st[stat.ST_MTIME]) print "copy" , "=>" st = os.stat(outfile) print "mode" , oct (stat.S_IMODE(st[stat.ST_MODE])) print "atime" , time.ctime(st[stat.ST_ATIME]) print "mtime" , time.ctime(st[stat.ST_MTIME]) original = > mode 0666 atime Thu Oct 14 15 : 15 : 50 1999 mtime Mon Nov 13 15 : 42 : 36 1995 copy = > mode 0666 atime Thu Oct 14 15 : 15 : 50 1999 mtime Mon Nov 13 15 : 42 : 36 1995 system函数在当前进程下执行一个新命令, 并等待它完成 使用 os 执行操作系统命令 import os if os.name = = "nt" : command = "dir" else : command = "ls -l" os.system(command) - rwxrw - r - - 1 effbot effbot 76 Oct 9 14 : 17 README - rwxrw - r - - 1 effbot effbot 1727 Oct 7 19 : 00 SimpleAsyncHTTP.py - rwxrw - r - - 1 effbot effbot 314 Oct 7 20 : 29 aifc - example - 1.py - rwxrw - r - - 1 effbot effbot 259 Oct 7 20 : 38 anydbm - example - 1.py ... 命令通过操作系统的标准 shell 执行, 并返回 shell 的退出状态. 需要注意的是在 Windows 下, shell 通常是command.com, 它的推出状态总是 0. exec函数会使用新进程替换当前进程(或者说是"转到进程"). 使用 os 模块启动新进程 import os import sys program = "python" arguments = [ "hello.py" ] print os.execvp(program, (program,) + tuple (arguments)) print "goodbye" hello again, and welcome to the show execvp函数, 它会从标准路径搜索执行程序, 把第二个参数(元组)作为单独的参数传递给程序, 并使用当前的环境变量来运行程序. 其他七个同类型函数请参阅Python Library Reference. 在 Unix 环境下, 你可以通过组合使用exec,fork以及wait函数来从当前程序调用另一个程序,fork函数复制当前进程,wait函数会等待一个子进程执行结束. 使用 os 模块调用其他程序 (Unix) import os import sys def run(program, * args): pid = os.fork() if not pid: os.execvp(program, (program,) + args) return os.wait()[ 0 ] run( "python" , "hello.py" ) print "goodbye" hello again, and welcome to the show goodbye fork函数在子进程返回中返回 0 (这个进程首先从fork返回值), 在父进程中返回一个非 0 的进程标识符(子进程的 PID ). 也就是说, 只有当我们处于子进程的时候 "not pid" 才为真. fork和wait函数在 Windows 上是不可用的, 但是你可以使用spawn函数不过,spawn不会沿着路径搜索可执行文件, 你必须自己处理好这些. 或许系统当前的“PATH”变量的值 >>> import string >>> import os >>> for path in string.split(os.environ[ "PATH" ],os.pathsep): print path C:\Program Files\NVIDIA Corporation\PhysX\Common d:\program files\Python27\Lib\site - packages\PyQt4 C:\windows\system32 C:\windows C:\windows\System32\Wbem C:\windows\System32\WindowsPowerShell\v1. 0 \ C:\Program Files\Common Files\Thunder Network\KanKan\Codecs D:\Program Files\python D:\Program Files\Java\jdk1. 6.0_23 / bin D:\Program Files\Java\jdk1. 6.0_23 / jre / bin C:\Program Files\Microsoft SQL Server\ 90 \Tools\binn\ D:\vs2010 - qt - src - 4.7 . 4 \qt - src - 4.7 . 4 \ bin C:\Program Files\Intel\WiFi\ bin \ C:\Program Files\Common Files\Intel\WirelessCommon\ C:\Program Files\Lenovo\Bluetooth Software\ D:\vs2010 - qt - src - 4.7 . 4 \qt - src - 4.7 . 4 \ bin D:\vs2010 - qt - src - 4.7 . 4 \qt - src - 4.7 . 4 \lib D:\vs2010 - qt - src - 4.7 . 4 \qt - src - 4.7 . 4 \include D:\Qt\ 4.7 . 4 \ bin >>> 使用 os 模块调用其他程序 (Windows) import os import string def run(program, * args): # find executable for path in string.split(os.environ[ "PATH" ], os.pathsep): file = os.path.join(path, program) + ".exe" try : return os.spawnv(os.P_WAIT, file , ( file ,) + args) except os.error: pass raise os.error, "cannot find executable" run( "python" , "hello.py" ) print "goodbye" hello again, and welcome to the show goodbye spawn函数还可用于在后台运行一个程序.下面这个例子给run函数添加了一个可选的mode参数; 当设置为os.P_NOWAIT时, 这个脚本不会等待子程序结束, 默认值os.P_WAIT时spawn会等待子进程结束. 其它的标志常量还有os.P_OVERLAY,它使得spawn的行为和exec类似, 以及os.P_DETACH, 它在后台运行子进程, 与当前控制台和键盘焦点隔离. import os import string def run(program, * args, * * kw): # find executable mode = kw.get( "mode" , os.P_WAIT) for path in string.split(os.environ[ "PATH" ], os.pathsep): file = os.path.join(path, program) + ".exe" try : return os.spawnv(mode, file , ( file ,) + args) except os.error: pass raise os.error, "cannot find executable" run( "python" , "hello.py" , mode = os.P_NOWAIT) print "goodbye" goodbye hello again, and welcome to the show 下面这个例子提供了一个在 Unix 和 Windows 平台上通用的spawn方法 使用 spawn 或 fork/exec 调用其他程序 import os import string if os.name in ( "nt" , "dos" ): exefile = ".exe" else : exefile = "" def spawn(program, * args): try : # possible 2.0 shortcut! return os.spawnvp(program, (program,) + args) except AttributeError: pass try : spawnv = os.spawnv except AttributeError: # assume it's unix pid = os.fork() if not pid: os.execvp(program, (program,) + args) return os.wait()[ 0 ] else : # got spawnv but no spawnp: go look for an executable for path in string.split(os.environ[ "PATH" ], os.pathsep): file = os.path.join(path, program) + exefile try : return spawnv(os.P_WAIT, file , ( file ,) + args) except os.error: pass raise IOError, "cannot find executable" # # try it out! spawn( "python" , "hello.py" ) print "goodbye" hello again, and welcome to the show goodbye 处理守护进程 Unix 系统中, 你可以使用fork函数把当前进程转入后台(一个"守护者/daemon"). 一般来说, 你需要派生(fork off)一个当前进程的副本, 然后终止原进程 使用 os 模块使脚本作为守护执行 (Unix) import os import time pid = os.fork() if pid: os._exit( 0 ) # kill original print "daemon started" time.sleep( 10 ) print "daemon terminated" 使用 os 模块终止当前进程 importos importsys try: sys.exit(1) exceptSystemExit, value: print"caught exit(%s)"%value try: os._exit(2) exceptSystemExit, value: print"caught exit(%s)"%value print"bye!" caught exit(1) 本文转自被遗忘的博客园博客,原文链接:http://www.cnblogs.com/rollenholt/archive/2011/11/26/2264597.html,如需转载请自行联系原作者

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

用户登录
用户注册