我有以下代码:
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, PatternMatchingEventHandler
import duckdb
path = "landing/persistent/"
global con
con = duckdb.connect(database='formatted/my-db.duckdb', read_only=False)
class EventHandler(PatternMatchingEventHandler):
def __init__(self, query):
PatternMatchingEventHandler.__init__(
self,
patterns=["*.csv"],
ignore_directories=True,
case_sensitive=False,
)
self.query = query
def on_created(self, event):
filename = event.src_path.split("/")[-1]
filename_trunc = filename.split(".")[0]
try:
cursor = con.cursor()
cursor.execute(query.format(filename_trunc),
[event.src_path])
cursor.execute("show tables")
print(cursor.fetchall())
except Exception as e:
print(e)
finally:
cursor.close()
query = "CREATE TABLE {} AS SELECT * FROM read_csv_auto(?);"
event_handler = EventHandler(query)
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
它基本上等待路径中的文件并将其作为新表添加到数据库中。 但是,我有两个额外的用例:
- 添加一个经过一些修改的新 csv(已为该文件名创建一个表),并且表中可能已存在一些行。
- 添加带有新列的新 csv(已为该文件名创建表格)
但是,我不知道使用 sql 或 duckdb 如何有效地跟踪这两种情况。 任何帮助将不胜感激,谢谢。
最佳答案
如果表不存在,您可以使用CREATE OR REPLACE
语法创建表,如果表存在则替换。
query = "CREATE OR REPLACE TABLE {} AS SELECT * FROM read_csv_auto(?);"
每次您拥有与表同名的 csv 文件时,此选项基本上都会删除并重新创建表。
或者
仅当表尚不存在时,您才可以使用 IF NOT EXISTS
语法来创建该表。
query = "CREATE TABLE IF NOT EXISTS {} AS SELECT * FROM read_csv_auto(?);"
接下来,您必须在 python 中构建一些逻辑来更新行或添加列。 你可以试试loading the csv file首先作为pandas DF。 然后使用下面链接的 duckdb 和 pandas DF 功能。
https://duckdb.org/docs/guides/python/sql_on_pandas https://duckdb.org/docs/guides/python/import_pandas https://duckdb.org/docs/guides/python/export_pandas
关于python - 在 Duckdb 中向表或记录中添加不重复的列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74186714/