forked from RustPython/RustPython
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtermios.rs
More file actions
111 lines (103 loc) · 4.33 KB
/
termios.rs
File metadata and controls
111 lines (103 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
pub(crate) use self::termios::make_module;
#[pymodule]
mod termios {
use crate::vm::{
builtins::{PyBaseExceptionRef, PyBytes, PyInt, PyListRef, PyTypeRef},
function::IntoPyObject,
PyObjectRef, PyResult, TryFromObject, VirtualMachine,
};
use termios::Termios;
// TODO: more ioctl numbers
#[pyattr]
use libc::{TIOCGWINSZ, TIOCSWINSZ};
#[pyattr]
use termios::{
os::target::NCCS, B0, B110, B1200, B134, B150, B1800, B19200, B200, B2400, B300, B38400,
B4800, B50, B600, B75, B9600, BRKINT, CLOCAL, CREAD, CS5, CS6, CS7, CS8, CSIZE, CSTOPB,
ECHO, ECHOE, ECHOK, ECHONL, HUPCL, ICANON, ICRNL, IEXTEN, IGNBRK, IGNCR, IGNPAR, INLCR,
INPCK, ISIG, ISTRIP, IXANY, IXOFF, IXON, NOFLSH, OCRNL, ONLCR, ONLRET, ONOCR, OPOST,
PARENB, PARMRK, PARODD, TCIFLUSH, TCIOFF, TCIOFLUSH, TCION, TCOFLUSH, TCOOFF, TCOON,
TCSADRAIN, TCSAFLUSH, TCSANOW, TOSTOP, VEOF, VEOL, VERASE, VINTR, VKILL, VMIN, VQUIT,
VSTART, VSTOP, VSUSP, VTIME,
};
#[pyfunction]
fn tcgetattr(fd: i32, vm: &VirtualMachine) -> PyResult<Vec<PyObjectRef>> {
let termios = Termios::from_fd(fd).map_err(|e| termios_error(e, vm))?;
let noncanon = (termios.c_lflag & termios::ICANON) == 0;
let cc = termios
.c_cc
.iter()
.enumerate()
.map(|(i, &c)| match i {
termios::VMIN | termios::VTIME if noncanon => vm.ctx.new_int(c).into(),
_ => vm.ctx.new_bytes(vec![c as u8]).into(),
})
.collect::<Vec<_>>();
let out = vec![
termios.c_iflag.into_pyobject(vm),
termios.c_oflag.into_pyobject(vm),
termios.c_cflag.into_pyobject(vm),
termios.c_lflag.into_pyobject(vm),
termios::cfgetispeed(&termios).into_pyobject(vm),
termios::cfgetospeed(&termios).into_pyobject(vm),
vm.ctx.new_list(cc).into(),
];
Ok(out)
}
#[pyfunction]
fn tcsetattr(fd: i32, when: i32, attributes: PyListRef, vm: &VirtualMachine) -> PyResult<()> {
let [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] =
<&[PyObjectRef; 7]>::try_from(&*attributes.borrow_vec())
.map_err(|_| {
vm.new_type_error("tcsetattr, arg 3: must be 7 element list".to_owned())
})?
.clone();
let mut termios = Termios::from_fd(fd).map_err(|e| termios_error(e, vm))?;
termios.c_iflag = iflag.try_into_value(vm)?;
termios.c_oflag = oflag.try_into_value(vm)?;
termios.c_cflag = cflag.try_into_value(vm)?;
termios.c_lflag = lflag.try_into_value(vm)?;
termios::cfsetispeed(&mut termios, ispeed.try_into_value(vm)?)
.map_err(|e| termios_error(e, vm))?;
termios::cfsetospeed(&mut termios, ospeed.try_into_value(vm)?)
.map_err(|e| termios_error(e, vm))?;
let cc = PyListRef::try_from_object(vm, cc)?;
let cc = cc.borrow_vec();
let cc = <&[PyObjectRef; NCCS]>::try_from(&*cc).map_err(|_| {
vm.new_type_error(format!(
"tcsetattr: attributes[6] must be {} element list",
NCCS
))
})?;
for (cc, x) in termios.c_cc.iter_mut().zip(cc.iter()) {
*cc = if let Some(c) = x.payload::<PyBytes>().filter(|b| b.as_bytes().len() == 1) {
c.as_bytes()[0] as _
} else if let Some(i) = x.payload::<PyInt>() {
i.try_to_primitive(vm)?
} else {
return Err(vm.new_type_error(
"tcsetattr: elements of attributes must be characters or integers".to_owned(),
));
};
}
termios::tcsetattr(fd, when, &termios).map_err(|e| termios_error(e, vm))?;
Ok(())
}
fn termios_error(err: std::io::Error, vm: &VirtualMachine) -> PyBaseExceptionRef {
vm.new_exception(
error_type(vm),
vec![
err.raw_os_error().into_pyobject(vm),
vm.ctx.new_str(err.to_string()).into(),
],
)
}
#[pyattr(name = "error", once)]
fn error_type(vm: &VirtualMachine) -> PyTypeRef {
vm.ctx.new_exception_type(
"termios",
"error",
Some(vec![vm.ctx.exceptions.os_error.clone()]),
)
}
}