原文:https://greptime.com/blogs/2024-03-19-keyboard-monitoring
代码:https://github.com/GreptimeTeam/demo-scene/tree/main/keyboard-monitor
项目简介
该项目实现了打字频率统计及可视化功能。
主要使用的库
pynput
:允许您控制和监视输入设备。 这里我们用来获取键盘输入。
SQLAlchemy
:数据库操作。 这里我们用来保存键盘输入。
streamlit
:提供可视化界面。
项目组成
agent.py :获得键盘输入
display.py:可视化
补充说明
如果你不想用原文的数据库,也可以替换为本地的数据库,如免安装的sqlite
,
agent.py
# agent.py
from dotenv import load_dotenv
from pynput import keyboard
from pynput.keyboard import Key
import concurrent.futures
import logging
import os
import queue
import sqlalchemy
import sqlalchemy.exc
import sys
import time
MODIFIERS = {
Key.shift, Key.shift_l, Key.shift_r,
Key.alt, Key.alt_l, Key.alt_r, Key.alt_gr,
Key.ctrl, Key.ctrl_l, Key.ctrl_r,
Key.cmd, Key.cmd_l, Key.cmd_r,
}
TABLE = sqlalchemy.Table(
'keyboard_monitor',
sqlalchemy.MetaData(),
sqlalchemy.Column('hits', sqlalchemy.String),
sqlalchemy.Column('ts', sqlalchemy.DateTime),
)
if __name__ == '__main__':
load_dotenv()
log = logging.getLogger("agent")
log.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s %(message)s')
file_handler = logging.FileHandler(f'agent-{time.time_ns()}.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
stdout_handler.setFormatter(formatter)
log.addHandler(file_handler)
log.addHandler(stdout_handler)
#engine = sqlalchemy.create_engine(os.environ['DATABASE_URL'],
# echo_pool=True,
# isolation_level='AUTOCOMMIT')
engine = sqlalchemy.create_engine("sqlite:///keyboard.db")
current_modifiers = set()
pending_hits = queue.Queue()
cancel_signal = queue.Queue()
def on_press(key):
if key in MODIFIERS:
current_modifiers.add(key)
else:
hits = sorted([ str(key) for key in current_modifiers ]) + [ str(key) ]
hits = '+'.join(hits)
pending_hits.put(hits)
log.debug(f'{key} pressed, current_modifiers: {current_modifiers}')
def on_release(key):
if key in MODIFIERS:
try:
current_modifiers.remove(key)
except KeyError:
log.warning(f'Key {key} not in current_modifiers {current_modifiers}')
log.debug(f'{key} released, current_modifiers: {current_modifiers}')
#with engine.connect() as connection:
# connection.execute(sqlalchemy.sql.text("""
# CREATE TABLE IF NOT EXISTS keyboard_monitor (
# hits STRING NULL,
# ts TIMESTAMP(3) NOT NULL,
# TIME INDEX ("ts")
# ) ENGINE=mito WITH( regions = 1, ttl = '3months')
# """))
# ...
from sqlalchemy import create_engine, Table, Column, String, TIMESTAMP, MetaData, Index
metadata = MetaData()
keyboard_monitor = Table(
'keyboard_monitor', metadata,
Column('hits', String, nullable=True),
Column('ts', TIMESTAMP, nullable=False),
)
metadata.create_all(engine)
def sender_thread():
retries = 0
while True:
hits = pending_hits.get()
log.debug(f'got: {hits}')
if hits is None:
log.info("Exiting...")
break
with engine.connect() as connection:
try:
log.debug(f'sending: {hits}')
connection.execute(TABLE.insert().values(hits=hits, ts=sqlalchemy.func.now()))
connection.commit()# ...
log.info(f'sent: {hits}')
retries = 0
except sqlalchemy.exc.OperationalError as e:
if retries >= 10:
log.error(f'Retry exceeds. Operational error: {e}')
pending_hits.put(hits)
continue
if e.connection_invalidated:
log.warning(f'Connection invalidated: {e}')
pending_hits.put(hits)
continue
msg = str(e)
if "(1815, 'Internal error: 1000')" in msg:
# TODO 1815 - should not handle internal error;
# see https://github.com/GreptimeTeam/greptimedb/issues/3447
log.warning(f'Known operational error: {e}')
pending_hits.put(hits)
continue
elif '2005' in msg and 'Unknown MySQL server host' in msg:
log.warning(f'DNS temporary unresolved: {e}')
pending_hits.put(hits)
continue
raise e
finally:
retries += 1
def listener_thread():
with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
log.info("Listening...")
cancel_signal.get()
pending_hits.put(None)
log.info("Exiting...")
with concurrent.futures.ThreadPoolExecutor() as executor:
sender = executor.submit(sender_thread)
listener = executor.submit(listener_thread)
try:
f = concurrent.futures.wait([sender, listener], return_when=concurrent.futures.FIRST_EXCEPTION)
for fut in f.done:
log.error(f'Unhandled exception for futures: {fut.exception(timeout=0)}')
except KeyboardInterrupt as e:
log.info("KeyboardInterrupt. Exiting...")
except Exception as e:
log.error(f'Unhandled exception: {e}')
finally:
cancel_signal.put(True)
display.py
# display.py
import datetime
import os
from dotenv import load_dotenv
import pytz
import streamlit as st
import tzlocal
import pandas
st.title("Keyboard Monitor")
load_dotenv()
#conn = st.connection(
## type="sql",
# url="sqlite:///keyboard.db",
#)
conn = st.connection('keyboard', type='sql', url="sqlite:///keyboard.db")
df = conn.query("SELECT COUNT(*) AS total_hits FROM keyboard_monitor")
st.metric("Total hits", df.total_hits[0])
most_frequent_key, most_frequent_combo = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_key.metric("Most frequent key", df.hits[0])
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_combo.metric("Most frequent combo", df.hits[0])
top_frequent_keys, top_frequent_combos = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_keys.subheader("Top 10 keys")
top_frequent_keys.dataframe(df)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_combos.subheader("Top 10 combos")
top_frequent_combos.dataframe(df)
st.header("Find your inputs frequency of day")
local_tz = tzlocal.get_localzone()
hours = int(local_tz.utcoffset(datetime.datetime.now()).total_seconds() / 3600)
if hours > 0:
offset = f" + INTERVAL '{hours} hours'"
elif hours < 0:
offset = f" - INTERVAL '{hours} hours'"
else:
offset = ''
d = st.date_input("Pick a day:", value=datetime.date.today())
query = f"""
SELECT
ts,
COUNT(1) AS times
FROM keyboard_monitor
WHERE strftime('%Y-%m-%d', ts, 'localtime') = '{d}'
GROUP BY strftime('%Y-%m-%d %H:00:00', ts)
ORDER BY ts ASC
LIMIT 10;
"""
df = conn.query(query)
#print(df.keys())
df['ts'] = pandas.to_datetime(df['ts'])
df['ts'] = df['ts'].dt.tz_localize(pytz.utc).dt.tz_convert(local_tz)
st.dataframe(df)