>

Python 物件導向與系統程式

進階 更新於 2026 約 45 分鐘
📋 目錄
  1. 類別、繼承、多型、封裝
  2. 檔案操作 — CSV / JSON / TXT 讀寫
  3. 裝飾器與閉包
  4. 生成器與迭代器
  5. 併發程式設計 — Threading 與 Multiprocessing

一、類別、繼承、多型、封裝

Python 是一門「物件導向程式設計(Object-Oriented Programming, OOP)」語言,幾乎所有東西都是物件。掌握 OOP 概念是邁向進階開發者的必經之路。本章節將從類別的基本結構談起,一路探討封裝、繼承、多型,以及 Python 特有的魔術方法與 MRO(方法解析順序)。

1.1 類別的基本結構

類別(Class)是用來建立物件(Object)的「藍圖」。在 Python 中,使用 class 關鍵字來定義類別。

步驟 1

建立第一個類別

class Dog:
    """這是一個 Dog 類別"""

    # 類別屬性(所有实例共享)
    species = "犬科"

    # 建構子(初始化)
    def __init__(self, name, age):
        # 實例屬性
        self.name = name
        self.age = age

    # 實例方法
    def bark(self):
        return f"{self.name} 說:汪汪!"

    def __str__(self):
        return f"{self.name},{self.age}歲"


# 建立實例
buddy = Dog("阿福", 3)
max    = Dog("大毛", 5)

print(buddy.bark())          # 阿福 說:汪汪!
print(str(buddy))            # 阿福,3歲
print(Dog.species)            # 犬科
print(buddy.species)          # 犬科(通過實例訪問)
Self 是什麼?
self 是對「目前實例物件」的參考。當我們呼叫 buddy.bark() 時,Python 內部會把它轉成 Dog.bark(buddy)。沒有 self,方法就無法知道操作的是哪一個實例。

1.2 @property 封裝屬性

封裝(Encapsulation)是把資料包裝在類別內部,並透過公開的介面(方法)來存取。Python 沒有真正的 private 關鍵字,但透過命名慣例(底線底線前綴 __)與 @property 裝飾器,可以做到屬性封裝。

class BankAccount:
    def __init__(self, owner, balance):
        self.owner = owner
        self.__balance = balance   # 雙底線前綴 → 名稱修飾(name mangling),外部難以直接訪問

    @property
    def balance(self):
        """唯讀屬性:取得帳戶餘額"""
        return self.__balance

    @property
    def balance_formatted(self):
        """唯讀屬性:格式化後的餘額"""
        return f"NT${self.__balance:,.0f}"

    def deposit(self, amount):
        if amount <= 0:
            raise ValueError("存款金額必須為正數")
        self.__balance += amount

    def withdraw(self, amount):
        if amount > self.__balance:
            raise ValueError("餘額不足")
        self.__balance -= amount


account = BankAccount("王小明", 10000)

print(account.balance)             # 10000
print(account.balance_formatted)   # NT$10,000
account.deposit(5000)
print(account.balance_formatted)   # NT$15,000

# 試圖直接修改(會失敗)
# account.__balance = -999999  # AttributeError: 'BankAccount' object has no attribute '__balance'

account.withdraw(3000)
print(account.balance_formatted)   # NT$12,000
封裝的好處:內部實作可以自由改變(例如改用資料庫儲存),但只要介面(屬性 / 方法名稱)不變,所有外部程式碼都不需要修改。

1.3 繼承與 super()

繼承(Inheritance)讓我們建立一個「子類別」,複用父類別的屬性與方法,並可覆寫(override)或擴充功能。

步驟 2

單一繼承

class Animal:
    def __init__(self, name):
        self.name = name

    def speak(self):
        raise NotImplementedError("子類必須實作 speak()")


class Cat(Animal):
    def speak(self):
        return f"{self.name} 說:喵嗚~"


class Dog2(Animal):
    def speak(self):
        return f"{self.name} 說:汪嗚!"


class Bird(Animal):
    def speak(self):
        return f"{self.name} 說:啾啾!"


animals = [Cat("小橘"), Dog2("阿福"), Bird("小嘰")]
for a in animals:
    print(a.speak())
# 小橘 說:喵嗚~
# 阿福 說:汪嗚!
# 小嘰 說:啾啾!
步驟 3

super() 呼叫父類方法

class Employee:
    def __init__(self, name, salary):
        self.name = name
        self.salary = salary

    def give_bonus(self, percent):
        self.salary *= (1 + percent / 100)


class Manager(Employee):
    def __init__(self, name, salary, team_size):
        # super().__init__() 呼叫父類的 __init__
        super().__init__(name, salary)
        self.team_size = team_size

    def give_bonus(self, percent):
        # 主管有額外 5% 獎金加成
        extra = percent + 5
        super().give_bonus(extra)


tom = Manager("湯姆", 50000, 8)
tom.give_bonus(10)        # 10% + 5% = 15%
print(f"{tom.name} 的薪水:{tom.salary}")  # 57500

1.4 多重繼承與 MRO

Python 支援多重繼承(一个類別繼承多個父類),但必須了解 MRO(Method Resolution Order,方法解析順序)。使用 類別.__mro__類別.mro() 可查看順序。

class A:
    def greet(self):
        return "A 說你好"


class B:
    def greet(self):
        return "B 說你好"


class C(A, B):       # 先繼承 A,再繼承 B
    pass


c = C()
print(c.greet())     # A 說你好
print(C.__mro__)
# (<class '__main__.C'>, <class '__main__.A'>, <class '__main__.B'>, <class 'object'>)

# 調用 B 的方法(繞過 MRO)
print(B.greet(c))    # B 說你好
MRO 採用 C3 線性化演算法。原則是:子類永遠在父類之前,同一層級由左到右優先。如果多重繼承設計得太複雜,程式碼會難以維護,建議多用「組合(Composition)」而非繼承。

1.5 多型(Polymorphism)

多型的核心精神是:「不同物件提供同一個介面,但產生不同行為。」Python 本身就是「鴨子型別(Duck Typing)」,不需明確宣告介面,只要物件有該方法就能用。

class Rectangle:
    def __init__(self, w, h):
        self.w, self.h = w, h
    def area(self):
        return self.w * self.h

class Circle:
    def __init__(self, r):
        self.r = r
    def area(self):
        return 3.14159 * self.r ** 2

class Triangle:
    def __init__(self, b, h):
        self.b, self.h = b, h
    def area(self):
        return 0.5 * self.b * self.h


shapes = [Rectangle(4, 5), Circle(3), Triangle(6, 4)]
total = sum(s.area() for s in shapes)
print(f"總面積:{total:.2f}")   # 總面積:73.10

1.6 魔術方法(Magic Methods)

Python 類別中,以雙底線 __ 開頭和結尾的方法,稱為「魔術方法」或「特殊方法」。它們用於運算子重載、物件初始化、字串表達等情境。

class Vector:
    def __init__(self, x, y):
        self.x, self.y = x, y

    def __add__(self, other):
        """+ 運算子"""
        return Vector(self.x + other.x, self.y + other.y)

    def __sub__(self, other):
        """- 運算子"""
        return Vector(self.x - other.x, self.y - other.y)

    def __eq__(self, other):
        """== 運算子"""
        return self.x == other.x and self.y == other.y

    def __repr__(self):
        """除錯用字串表達"""
        return f"Vector({self.x}, {self.y})"

    def __abs__(self):
        """abs() 內建函式"""
        import math
        return math.hypot(self.x, self.y)


v1 = Vector(3, 4)
v2 = Vector(1, 2)

print(v1 + v2)        # Vector(4, 6)
print(v1 - v2)        # Vector(2, 2)
print(v1 == v2)       # False
print(abs(v1))        # 5.0

✏️ 練習題:設計一個書籍管理系統

建立 Book 類別,包含書名、作者、頁數屬性,並實作 __str____lt__(用於依頁數排序)。再建立子類別 EBook,多一個檔案大小屬性,並透過 super() 呼叫父類 __init__。

二、檔案操作 — CSV / JSON / TXT 讀寫

幾乎所有系統程式都需要處理檔案 I/O。本章節涵蓋文字檔、CSV 與 JSON 三種常見格式,並以一個「CLI 記事本應用」實作貫穿所有技術。

2.1 文字檔讀寫

Python 使用 open() 函式開啟檔案,推薦使用 with 語句自動關閉資源。

# 寫入文字檔
with open("notes.txt", "w", encoding="utf-8") as f:
    f.write("第一行記事\n")
    f.write("第二行記事\n")

# 讀取文字檔(逐行)
with open("notes.txt", "r", encoding="utf-8") as f:
    for line in f:
        print(line.rstrip("\n"))

# 讀取全部內容
with open("notes.txt", "r", encoding="utf-8") as f:
    content = f.read()
    print("檔案內容:", content)

2.2 CSV 檔案處理

Python 內建的 csv 模組適合處理表格資料。

import csv

# 寫入 CSV
students = [
    ["學號", "姓名", "成績"],
    ["A001", "王小明", 88],
    ["A002", "陳美麗", 92],
    ["A003", "張大頭", 75],
]

with open("students.csv", "w", encoding="utf-8", newline="") as f:
    writer = csv.writer(f)
    writer.writerows(students)

# 讀取 CSV(以字典方式)
with open("students.csv", "r", encoding="utf-8", newline="") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"學號:{row['學號']},姓名:{row['姓名']},成績:{row['成績']}")

# 讀取為列表
with open("students.csv", "r", encoding="utf-8", newline="") as f:
    rows = list(csv.reader(f))
    for r in rows[1:]:   # 跳過標題列
        print(r)

2.3 JSON 檔案處理

JSON 是網路 API 與設定檔最常使用的格式。Python 的 json 模組可以輕鬆序列化(dump)與反序列化(load)。

import json

# Python 字典 → JSON 字串
config = {
    "app_name": "MyApp",
    "version": "1.0.0",
    "debug": True,
    "database": {
        "host": "localhost",
        "port": 5432,
        "tables": ["users", "posts", "comments"]
    }
}

json_str = json.dumps(config, ensure_ascii=False, indent=2)
print(json_str)

# 寫入 JSON 檔案
with open("config.json", "w", encoding="utf-8") as f:
    json.dump(config, f, ensure_ascii=False, indent=2)

# 從 JSON 檔案讀取
with open("config.json", "r", encoding="utf-8") as f:
    loaded = json.load(f)
    print(loaded["database"]["host"])   # localhost

2.4 實作 CLI 記事本應用

以下是一個完整的命令列記事本,支援新增、列表、刪除、搜尋功能,資料以 JSON 格式保存在本地。

步驟 4

CLI 記事本完整程式

#!/usr/bin/env python3
"""命令列記事本 — 使用 JSON 儲存"""

import json
import os
from datetime import datetime

DATA_FILE = "notes.json"


def load_notes():
    if not os.path.exists(DATA_FILE):
        return []
    with open(DATA_FILE, "r", encoding="utf-8") as f:
        return json.load(f)


def save_notes(notes):
    with open(DATA_FILE, "w", encoding="utf-8") as f:
        json.dump(notes, f, ensure_ascii=False, indent=2)


def add_note(title, content):
    notes = load_notes()
    note = {
        "id": len(notes) + 1,
        "title": title,
        "content": content,
        "created_at": datetime.now().isoformat()
    }
    notes.append(note)
    save_notes(notes)
    print(f"✅ 已新增記事:「{title}」(ID: {note['id']})")


def list_notes():
    notes = load_notes()
    if not notes:
        print("📭 目前沒有任何記事。")
        return
    print(f"\n{'='*50}")
    print(f"{'ID':<5} {'標題':<20} {'建立時間':<20}")
    print(f"{'-'*50}")
    for n in notes:
        ts = n["created_at"][:19].replace("T", " ")
        print(f"{n['id']:<5} {n['title']:<20} {ts:<20}")
    print(f"{'='*50}\n")


def search_notes(keyword):
    notes = load_notes()
    found = [n for n in notes if keyword in n["title"] or keyword in n["content"]]
    if not found:
        print(f"🔍 找不到包含「{keyword}」的記事。")
    else:
        print(f"🔍 找到 {len(found)} 筆記事:")
        for n in found:
            print(f"  [{n['id']}] {n['title']}")


def delete_note(note_id):
    notes = load_notes()
    original = len(notes)
    notes = [n for n in notes if n["id"] != note_id]
    if len(notes) == original:
        print(f"⚠️ 找不到 ID 為 {note_id} 的記事。")
    else:
        save_notes(notes)
        print(f"🗑️ 已刪除 ID 為 {note_id} 的記事。")


def show_note(note_id):
    notes = load_notes()
    for n in notes:
        if n["id"] == note_id:
            print(f"\n📄 ID: {n['id']}")
            print(f"   標題:{n['title']}")
            print(f"   內容:{n['content']}")
            print(f"   建立:{n['created_at']}\n")
            return
    print(f"⚠️ 找不到 ID 為 {note_id} 的記事。")


def main():
    import sys
    print("📒 CLI 記事本(Python 版)")
    print("指令:add / list / search / show / delete / quit\n")

    while True:
        try:
            cmd = input(">>> ").strip()
        except (KeyboardInterrupt, EOFError):
            print("\nBye!")
            break

        if cmd == "quit":
            print("再見!")
            break
        elif cmd == "list":
            list_notes()
        elif cmd.startswith("add "):
            parts = cmd[4:].split("|")
            if len(parts) == 2:
                add_note(parts[0].strip(), parts[1].strip())
            else:
                print("格式:add 標題 | 內容")
        elif cmd.startswith("search "):
            keyword = cmd[7:].strip()
            search_notes(keyword)
        elif cmd.startswith("show "):
            try:
                show_note(int(cmd[5:].strip()))
            except ValueError:
                print("ID 必須是數字。")
        elif cmd.startswith("delete "):
            try:
                delete_note(int(cmd[7:].strip()))
            except ValueError:
                print("ID 必須是數字。")
        else:
            print("未知指令。可用:add, list, search, show, delete, quit")


if __name__ == "__main__":
    main()
這個記事本使用 JSON 做持久化儲存,可以跨平台運行。嘗試在終端機執行,並用 add 買牛奶 | 今天下班去超市買牛奶 新增記事,再用 list 列出所有記事。

✏️ 練習題:將 JSON 格式改為 CSV

修改上述記事本,把 notes.json 改為 notes.csv,並支援匯出(export)功能,把所有記事匯出成一個 .txt 檔案。

三、裝飾器與閉包

裝飾器(Decorator)是 Python 最強大且最優雅的功能之一。它讓我們可以在不修改原函式的前提下,動態增添新功能。要理解裝飾器,必須先掌握「閉包(Closure)」與「可變引數(*args / **kwargs)」的概念。

3.1 可變引數 *args 與 **kwargs

當函式需要接受不定數量的參數時,使用 *args(元組)與 **kwargs(字典)。

def print_all(*args, **kwargs):
    print("位置參數(args):", args)
    print("關鍵字參數(kwargs):", kwargs)

print_all("蘋果", "香蕉", name="小明", age=25)
# 位置參數(args): ('蘋果', '香蕉')
# 關鍵字參數(kwargs): {'name': '小明', 'age': 25}

# 解包呼叫
fruits = ["葡萄", "草莓"]
info = {"city": "台北", "country": "台灣"}
print_all(*fruits, **info)
# 位置參數(args): ('葡萄', '草莓')
# 關鍵字參數(kwargs): {'city': '台北', 'country': '台灣'}

3.2 閉包(Closure)

閉包是指一個函式記住並存取其「外部作用域」中的變數,即使外部函式已經執行完畢。

def make_multiplier(factor):
    """工廠函式:建立一個乘以指定因子的函式"""
    def multiplier(number):
        return number * factor
    return multiplier   # 傳回內層函式(閉包)

times3 = make_multiplier(3)
times5 = make_multiplier(5)

print(times3(10))   # 30(10 * 3)
print(times5(10))   # 50(10 * 5)
print(times3(7))    # 21(7 * 3)

# 閉包捕获了外部的 factor
print(times3.__closure__)  # (<cell at 0x...: int object at 0x...>,)
print(times3.__closure__[0].cell_contents)  # 3

3.3 基本裝飾器

def my_decorator(func):
    """包裝函式的裝飾器"""
    def wrapper(*args, **kwargs):
        print("🔔 函式開始執行:", func.__name__)
        result = func(*args, **kwargs)
        print("✅ 函式執行完畢\n")
        return result
    return wrapper


@my_decorator
def say_hello(name):
    print(f"你好,{name}!")


@my_decorator
def add(a, b):
    return a + b


say_hello("小明")
# 🔔 函式開始執行: say_hello
# 你好,小明!
# ✅ 函式執行完畢

result = add(3, 5)
# 🔔 函式開始執行: add
# ✅ 函式執行完畢
print("結果:", result)   # 結果:8

3.4 functools.wraps 保留元資料

裝飾後的 wrapper 函式會失去原函式的名稱、文件字串(docstring)等元資料。使用 functools.wraps 可以把這些元資料「複製」到 wrapper 上。

import functools

def log_calls(func):
    @functools.wraps(func)   # 這行是關鍵!
    def wrapper(*args, **kwargs):
        print(f"📞 呼叫:{func.__name__}")
        print(f"   文件:{func.__doc__}")
        return func(*args, **kwargs)
    return wrapper


@log_calls
def greet(name, formal=False):
    """向某人打招呼,formal 參數控制正式或非正式語氣"""
    if formal:
        return f"敬啟者 {name},您好。"
    return f"嗨,{name}!"


print(greet("小明"))
# 📞 呼叫:greet
#    文件:向某人打招呼,formal 參數控制正式或非正式語氣
# 嗨,小明!

print(greet.__name__)        # greet(保留原本名稱)
print(greet.__doc__)         # 向某人打招呼...(保留文件)

3.5 帶參數的裝飾器

import functools

def repeat(times):
    """帶參數的裝飾器工廠"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            results = []
            for i in range(times):
                print(f"  第 {i+1}/{times} 次")
                results.append(func(*args, **kwargs))
            return results
        return wrapper
    return decorator


@repeat(times=3)
def say(text):
    return f"我說:{text}"


outputs = say("測試")
#   第 1/3 次
#   第 2/3 次
#   第 3/3 次

for o in outputs:
    print(o)
# 我說:測試
# 我說:測試
# 我說:測試

3.6 常見內建模組裝飾器:@lru_cache 加速運算

import functools
import time

@functools.lru_cache(maxsize=128)
def fibonacci(n):
    """使用快取的最佳化費波那契(效率從 O(2^n) 降至 O(n))"""
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)


# 無快取版本計時
def fib_no_cache(n):
    if n < 2:
        return n
    return fib_no_cache(n-1) + fib_no_cache(n-2)

# 測試有快取版本
start = time.perf_counter()
for i in range(35):
    fibonacci(i)
cached_time = time.perf_counter() - start
print(f"有快取:{cached_time:.4f} 秒")  # ~0.000x 秒

# 不使用 lru_cache 的同一計算
start = time.perf_counter()
# fib_no_cache(30)  # 這個會非常慢,不建議執行
uncached_time = time.perf_counter() - start
print(f"比較:相同硬體下快取版本快了幾百倍")
練習提議:試著建立一個 @timer 裝飾器,用於測量任何函式的執行時間。提示:使用 time.perf_counter() 計算差值,並用 @functools.wraps 保留元資料。

四、生成器與迭代器

處理大量資料時,記憶體管理至關重要。生成器(Generator)與迭代器(Iterator)讓我們能以「懶惰計算(Lazy Evaluation)」的方式處理序列,不必一次性把全部資料載入記憶體。

4.1 迭代器(Iterator)

迭代器是任何實作了 __iter__()__next__() 的物件。當迭代器耗盡,會引發 StopIteration 例外。

class CountUp:
    """從 0 數到 n 的迭代器"""
    def __init__(self, limit):
        self.limit = limit
        self.current = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.current >= self.limit:
            raise StopIteration
        val = self.current
        self.current += 1
        return val


counter = CountUp(5)
for num in counter:
    print(num, end=" ")
# 輸出:0 1 2 3 4

# 手動迭代
counter2 = CountUp(3)
print("\n", next(counter2))  # 0
print(next(counter2))        # 1
print(next(counter2))        # 2
# print(next(counter2))      # StopIteration 例外

4.2 生成器(Generator)

生成器是一種更簡潔的迭代器,使用 yield 關鍵字在函式內「暫停」執行,並yield的值每次被請求時才繼續。

def count_generator(limit):
    """使用 yield 的生成器函式"""
    n = 0
    while n < limit:
        yield n          # 暫停並產出值
        n += 1

gen = count_generator(5)
print(type(gen))            # <class 'generator'>
print(list(gen))            # [0, 1, 2, 3, 4]

# 無限生成器(需配合 StopIteration 或 break)
def fibonacci_gen():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

fib = fibonacci_gen()
for _ in range(10):
    print(next(fib), end=" ")
# 輸出:0 1 1 2 3 5 8 13 21 34

4.3 生成器運算式(Generator Expression)

與串列生成式類似,但用圓括號,稱為「生成器運算式」,不會立即產生完整串列。

# 串列生成式(一次性產生完整列表)
squares_list = [x**2 for x in range(1000000)]  # 占用大量記憶體

# 生成器運算式(懶惰評估)
squares_gen = (x**2 for x in range(1000000))   # 只占用極少記憶體

print(next(squares_gen))     # 0
print(next(squares_gen))     # 1
print(next(squares_gen))     # 4

# sum 也可接受生成器
total = sum(x**2 for x in range(10000))
print(f"1 萬個平方和:{total}")  # 333283335000

4.4 itertools 模組

標準庫的 itertools 提供了許多高效的迭代工具。

import itertools

# count: 無限計數器
cnt = itertools.count(start=1, step=2)
print(next(cnt))  # 1
print(next(cnt))  # 3

# cycle: 無限循環
colors = itertools.cycle(["紅", "綠", "藍"])
for _ in range(7):
    print(next(colors), end=" ")
# 輸出:紅 綠 藍 紅 綠 藍 紅

# chain: 串接多個可迭代物件
a = [1, 2, 3]
b = ["A", "B"]
combined = list(itertools.chain(a, b))
print("\n", combined)   # [1, 2, 3, 'A', 'B']

#islice: 切片(懶惰)
lazy_slice = list(itertools.islice(range(100), 10, 30, 3))
print(lazy_slice)       # [10, 13, 16, 19, 22, 25, 28]

4.5 實作:串流處理大型 CSV 檔案

步驟 5

使用生成器處理大型 CSV(避免一次性載入)

import csv

def stream_csv(filepath):
    """串流式讀取 CSV,每次只 yield 一筆記錄"""
    with open(filepath, "r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row


def filter_and_sum(filepath, column_name, min_value):
    """只遍歷一次,計算滿足條件的總和"""
    total = 0
    count = 0
    for record in stream_csv(filepath):
        try:
            value = float(record[column_name])
            if value >= min_value:
                total += value
                count += 1
        except (ValueError, KeyError):
            continue
    return total, count


# 假設有一個大檔案 sales.csv(百萬筆)
# total, cnt = filter_and_sum("sales.csv", "amount", 1000)
# print(f"共 {cnt} 筆超過 1000,總和:{total}")
為什麼要用生成器?假設有一個 10GB 的 CSV 檔案,使用串列一次性讀入會導致記憶體不足。但使用生成器串流處理,每次只佔用一筆記錄的記憶體(約幾KB),讓處理巨大檔案成為可能。

✏️ 練習題:建立無限質數生成器

使用「埃氏篩法」概念,建立一個 prime_generator(),能產生無限多個質數。提示:使用 yield 並配合 itertools.islice 取前 N 個質數做測試。

五、併發程式設計 — Threading 與 Multiprocessing

現代電腦有多核心 CPU,要充分發揮效能,需要利用併發(Concurrency)與平行(Parallelism)。Python 提供兩個主要途徑:threading(執行緒,適合 I/O 密集任務)與 multiprocessing(行程,適合 CPU 密集任務)。

5.1 threading — 執行緒

執行緒適用於網路請求、檔案讀寫等 I/O 阻塞場景,因為這些場景大部分時間在等待,執行緒可以交叉執行,提高整體效率。

import threading
import time

def download_file(name, seconds):
    print(f"🟢 開始下載:{name}")
    time.sleep(seconds)
    print(f"✅ 下載完成:{name}(耗時 {seconds}s)")

# 順序執行:下載三個檔案
start = time.perf_counter()
download_file("檔案A.pdf", 2)
download_file("檔案B.zip", 3)
download_file("檔案C.jpg", 1)
seq_time = time.perf_counter() - start
print(f"順序執行總耗時:{seq_time:.2f}s\n")  # ~6s

# 執行緒執行:同時下載(I/O 等待可重疊)
start = time.perf_counter()
t1 = threading.Thread(target=download_file, args=("檔案A.pdf", 2))
t2 = threading.Thread(target=download_file, args=("檔案B.zip", 3))
t3 = threading.Thread(target=download_file, args=("檔案C.jpg", 1))

t1.start()
t2.start()
t3.start()

t1.join()   # 等待執行緒結束
t2.join()
t3.join()

thread_time = time.perf_counter() - start
print(f"執行緒執行總耗時:{thread_time:.2f}s\n")  # ~3s(最長的決定上限)

5.2 Lock — 執行緒安全

多執行緒同時修改共享資料時會產生「競態條件(Race Condition)」。使用 threading.Lock 可確保同時只有一個執行緒進入關鍵區段。

import threading

counter = 0
lock = threading.Lock()


def increment(amount, name):
    global counter
    for _ in range(amount):
        with lock:          # 取得鎖,才進入臨界區
            current = counter
            # 模擬運算延遲
            # (若沒有鎖,多個執行緒可能同時讀到相同的 counter 值)
            counter = current + 1


# 無鎖版本(會產生錯誤)
counter_broken = 0
def increment_broken(amount):
    global counter_broken
    for _ in range(amount):
        counter_broken += 1


threads = [threading.Thread(target=increment_broken, args=(10000,)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"無鎖版本結果:{counter_broken}(應為 50000,但通常會少)")

# 有鎖版本(正確)
counter = 0
threads = [threading.Thread(target=increment, args=(10000, f"執行緒{i}")) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"有鎖版本結果:{counter}(正確:50000)")
GIL 限制:Python 的全域解釋器鎖(GIL)讓同一時間只有一個執行緒執行 Python 位元組碼。因此在 CPU 密集型任務上,執行緒無法真正平行。此時應使用 multiprocessing,它會建立獨立的 Python 解釋器行程,徹底突破 GIL。

5.3 multiprocessing — 行程

import multiprocessing
import time

def cpu_task(n):
    """一個較耗 CPU 的計算任務"""
    total = sum(i * i for i in range(n))
    return total

# 順序執行
start = time.perf_counter()
results = [cpu_task(5000000) for _ in range(4)]
seq_time = time.perf_counter() - start
print(f"順序執行:{seq_time:.2f}s")

# 多行程執行
if __name__ == "__main__":
    start = time.perf_counter()
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(cpu_task, [5000000] * 4)
    par_time = time.perf_counter() - start
    print(f"多行程執行:{par_time:.2f}s")
    print(f"加速比:{seq_time / par_time:.2f}x")

5.4 Process Pool 與結果收集

import multiprocessing

def calculate_square(n):
    return n * n

def calculate_with_metadata(n):
    return {"input": n, "square": n * n, "cube": n ** 3}


if __name__ == "__main__":
    numbers = list(range(1, 9))

    # map:依序收集結果(順序與輸入一致)
    with multiprocessing.Pool(4) as pool:
        squares = pool.map(calculate_square, numbers)
    print("平方:", squares)

    # starmap:傳入多元組參數
    with multiprocessing.Pool(4) as pool:
        params = [(n,) for n in range(1, 5)]
        cubes = pool.starmap(lambda n: n**3, params)
    print("立方:", cubes)

    # apply_async:非同步提交,不等待
    with multiprocessing.Pool(2) as pool:
        async_result = pool.apply_async(calculate_square, (99,))
        print("非同步結果:", async_result.get())   # 9801

5.5 執行緒+佇列(Queue)生產者/消費者模式

import threading
import queue
import time

q = queue.Queue()
stop_flag = threading.Event()

def producer(n):
    """生產者:產生工作項目"""
    for i in range(n):
        time.sleep(0.1)
        q.put(f"任務-{i+1}")
        print(f"📦 生產:任務-{i+1}")

def consumer(worker_id):
    """消費者:處理工作項目"""
    while not stop_flag.is_set() or not q.empty():
        try:
            item = q.get(timeout=1)    # 等待最多 1 秒
            print(f"  🛠️ 員工{worker_id} 處理:{item}")
            q.task_done()
        except queue.Empty:
            continue


# 建立生產者與消費者執行緒
threads = []
threads.append(threading.Thread(target=producer, args=(10,)))
threads.append(threading.Thread(target=consumer, args=(1,)))
threads.append(threading.Thread(target=consumer, args=(2,)))

for t in threads:
    t.start()
for t in threads:
    t.join()

q.join()
print("✅ 所有任務完成")

5.6 實作:並行網頁抓取

步驟 6

多執行緒批次抓取網頁標題

import threading
import time
import urllib.request
import ssl

# 忽略 SSL 憑證(僅供範例)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

urls = [
    "https://example.com",
    "https://example.org",
    "https://example.net",
    # (可替換為真實 URL)
]

results = []
lock = threading.Lock()


def fetch_url(url):
    try:
        with urllib.request.urlopen(url, timeout=5, context=ctx) as resp:
            title = resp.read(200).decode("utf-8", errors="ignore")
        with lock:
            results.append({"url": url, "status": "成功", "size": len(title)})
        print(f"✅ {url}")
    except Exception as e:
        with lock:
            results.append({"url": url, "status": str(e), "size": 0})
        print(f"❌ {url}: {e}")


start = time.perf_counter()
threads = [threading.Thread(target=fetch_url, args=(u,)) for u in urls]
for t in threads:
    t.start()
for t in threads:
    t.join()

elapsed = time.perf_counter() - start
print(f"\n完成!耗時 {elapsed:.2f}s,結果:")
for r in results:
    print(f"  {r['url']} → {r['status']}({r['size']} bytes)")
什麼時候用什麼?
I/O 密集(網路、磁碟、檔案):用 threadingasyncio
CPU 密集(運算、影像處理、數值分析):用 multiprocessing
需要跨行程共享資料:用 multiprocessing.Queuemultiprocessing.Manager
需要高併發 I/O:考慮 asyncio + aiohttp,效能比執行緒更高

✏️ 練習題:並行檔案處理

寫一個程式,使用 multiprocessing.Pool 同時處理多個文字檔,計算每個檔案的字數、行數與出現次數最高的單詞。輸入是一個資料夾路徑,輸出每個檔案的統計結果。