Unofficial Python implementation of the Siemens S7 communication protocol for interacting with Siemens S7 PLCs
Note: I only have an S7-1200 to test with, so I can't guarantee it works with other models (S7-300, S7-400, S7-1500).
Disclaimer: This project is provided "as is", without warranty of any kind, express or implied. The author assumes no liability for any damage, loss, downtime, or safety issues resulting from the use of this software, including but not limited to damage to PLCs, industrial equipment, production systems, or data.
This library is an unofficial, experimental implementation of the S7 protocol and is not intended for production or safety-critical use. It was created primarily for educational and research purposes.
The software has not been certified, validated, or tested for industrial deployment. Use at your own risk.
Always test thoroughly in a safe, isolated environment before connecting to live equipment.
This project is not affiliated with or endorsed by Siemens AG.
- Synchronous and Asynchronous clients - Choose between
ClientandAsyncClient - High-level API - Simple string-based addressing (
"DB1.0 INT 1") - Low-level API - Direct access to
S7Comm/AsyncS7Commfor advanced use cases - Read/Write operations - Single and multi-variable read/write support
- SZL (System Status List) - Read CPU state, module identification, and other diagnostic information
- PLC Control - Stop PLC execution
- Type-safe - Full type hints with mypy strict mode
pip install python-s7commOr with uv:
uv add python-s7commfrom python_s7comm import Client
# Connect to PLC
client = Client()
client.connect(address="192.168.0.1", rack=0, slot=1)
# Read data
data = client.read_area("DB1.0 INT 1") # Read 1 INT from DB1 at offset 0
print(int.from_bytes(data, "big", signed=True))
# Write data
client.write_area("DB1.0 INT 1", (42).to_bytes(2, "big", signed=True))
# Read multiple variables
results = client.read_multi_vars([
"DB1.0 INT 1",
"DB1.2 REAL 1",
"M0 BYTE 4",
])
# Get CPU state
cpu_state = client.get_cpu_state()
print(f"CPU is in {cpu_state.name} mode")
# Disconnect
client.disconnect()from python_s7comm import Client
with Client() as client:
client.connect(address="192.168.0.1", rack=0, slot=1)
data = client.read_area("DB1.0 BYTE 10")import asyncio
from python_s7comm import AsyncClient
async def main():
client = AsyncClient()
await client.connect(address="192.168.0.1", rack=0, slot=1)
# Read data
data = await client.read_area("DB1.0 DINT 2")
# Write data
await client.write_area("DB1.100 BYTE 4", b"\x01\x02\x03\x04")
# Read SZL
order_code = await client.get_order_code()
print(f"PLC Order Code: {order_code}")
await client.disconnect()
asyncio.run(main())Note:
AsyncClientuses an internal lock to send requests sequentially. The S7 protocol does not support concurrent requests on a single connection. But if you are brave enough, you can create multipleAsyncClientinstances (e.g., one for reading, another for writing) and use them within the same event loop:
import asyncio
import struct
from python_s7comm import AsyncClient
async def periodic_reader(client: AsyncClient):
"""Reads data every 1 second - runs independently."""
data = None
while True:
response = await client.read_area("DB2.0 INT 1")
new_value = int.from_bytes(response, "big", signed=True)
if new_value != data:
print(f"Data changed from {data} to {new_value}")
data = new_value
await asyncio.sleep(1)
async def writer(client: AsyncClient):
"""Performs writes - doesn't block the reader."""
for i in range(5):
await client.write_area("DB2.0 INT 1", struct.pack("!H", i))
print(f"Write: {i}")
await asyncio.sleep(3)
async def main():
reader = AsyncClient()
writer_client = AsyncClient()
await reader.connect(address="192.168.0.1", rack=0, slot=1)
await writer_client.connect(address="192.168.0.1", rack=0, slot=1)
# Reader and writer run concurrently without blocking each other
read_task = asyncio.create_task(periodic_reader(reader))
write_task = asyncio.create_task(writer(writer_client))
await write_task # Wait for writes to complete
read_task.cancel() # Stop the periodic reader
await reader.disconnect()
await writer_client.disconnect()
asyncio.run(main())DB area: DB<number>.<offset>[.<bit>] <type> <count>
Other areas: <area><offset>[.<bit>] <type> <count>
| Address | Description |
|---|---|
DB1.0 INT 1 |
1 INT from DB1 at byte 0 |
DB1.100 BYTE 10 |
10 BYTEs from DB1 at byte 100 |
DB5.0.0 BOOL 1 |
Bit 0 from DB5 at byte 0 |
M0 BYTE 4 |
4 BYTEs from Marker area |
I0.0 BOOL 1 |
Input bit 0.0 |
Q0 BYTE 1 |
1 BYTE from Output area |
| Area | Code | Description |
|---|---|---|
DB |
0x84 | Data Blocks |
M |
0x83 | Markers (Flags) |
I |
0x81 | Inputs |
Q |
0x82 | Outputs |
C |
0x1C | Counters |
T |
0x1D | Timers |
P |
0x80 | Peripheral I/O |
BOOL, BYTE, CHAR, WORD, INT, DWORD, DINT, REAL, COUNTER, TIMER
client = Client(
tpdu_size=1024, # COTP packet size
pdu_length=480, # S7 PDU length (negotiated)
source_tsap=0x0100, # Source TSAP
dest_tsap=0x0101, # Destination TSAP
)
client.connect(
address="192.168.0.1",
rack=0, # Rack number
slot=1, # Slot number (CPU slot)
port=102, # ISO-on-TCP port (default: 102)
)- Python 3.12 or newer
- Siemens S7-1200 (6ES7 214-1HG40-0XB0)