实现新增接口
基础代码
import env
import mcrud
import api
import snowflake
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
async def add_user ( req) :
data = await api. req. get_json( req)
values = [ data. get( k) for k in columns]
if "id" not in columns:
columns. append( "id" )
values. append( snowflake. idstr( ) )
db. add( table, columns, values)
return api. resp. success( data)
app = api. Api(
routes= [
api. resp. post( "/user/add" , add_user)
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
基本封装
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
app = api. Api(
routes= [
api. resp. post( "/user/add" , amcrud. add( db, table, columns) )
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
测试
zhttp 127.0 .0.1:8888/user/add name = zs age = 23 `
查询所有用户接口
基础代码
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
async def get_all ( req) :
if "id" not in columns:
columns. append( "id" )
data = db. get_all( table, columns)
return api. resp. success( data)
app = api. Api(
routes= [
api. resp. post( "/user" , amcrud. add( db, table, columns) ) ,
api. resp. get( "/user" , get_all) ,
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
基本封装
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
app = api. Api(
routes= [
api. resp. post( "/user" , amcrud. add( db, table, columns) ) ,
api. resp. get( "/user" , amcrud. get_all( db, table, columns) ) ,
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
测试
zhttp 127.0 .0.1:8888/user
根据ID查询接口
基础代码
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
async def get ( req) :
_id = api. req. get_path( req, "id" )
data = db. get_by_id( table, _id)
return api. resp. success( data)
app = api. Api(
routes= [
api. resp. post( "/user" , amcrud. add( db, table, columns) ) ,
api. resp. get( "/user" , amcrud. get_all( db, table, columns) ) ,
api. resp. get( "/user/{id}" , get) ,
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
基本封装
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
app = api. Api(
routes= [
api. resp. post( "/user" , amcrud. add( db, table, columns) ) ,
api. resp. get( "/user" , amcrud. get_all( db, table, columns) ) ,
api. resp. get( "/user/{id}" , amcrud. get( db, table, columns) ) ,
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
测试
zhttp 127.0 .0.1:8888/user/1793956346500816896
根据ID修改接口
基础代码
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
async def update ( req) :
_id = api. req. get_path( req, "id" )
data = await api. req. get_json( req)
new_columns = [ ]
values = [ ]
for k, v in data. items( ) :
if k in columns:
new_columns. append( k)
values. append( v)
db. update( table, _id, new_columns, values)
return api. resp. success( )
app = api. Api(
routes= [
api. resp. post( "/user" , amcrud. add( db, table, columns) ) ,
api. resp. get( "/user" , amcrud. get_all( db, table, columns) ) ,
api. resp. get( "/user/{id}" , amcrud. get( db, table, columns) ) ,
api. resp. put( "/user/{id}" , update) ,
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
基本封装
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
app = api. Api(
routes= [
api. resp. post( "/user" , amcrud. add( db, table, columns) ) ,
api. resp. get( "/user" , amcrud. get_all( db, table, columns) ) ,
api. resp. get( "/user/{id}" , amcrud. get( db, table, columns) ) ,
api. resp. put( "/user/{id}" , amcrud. update( db, table, columns) ) ,
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
测试
zhttp PUT 127.0 .0.1:8888/user/1793956346500816896 name = zss age = 33
zhttp 127.0 .0.1:8888/user/1793956346500816896
根据ID删除接口
基础代码
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
async def delete_route ( req) :
_id = api. req. get_path( req, "id" )
db. delete( table, _id)
return api. resp. success( )
app = api. Api(
routes= [
api. resp. post( "/user" , amcrud. add( db, table, columns) ) ,
api. resp. get( "/user" , amcrud. get_all( db, table, columns) ) ,
api. resp. get( "/user/{id}" , amcrud. get( db, table, columns) ) ,
api. resp. put( "/user/{id}" , amcrud. update( db, table, columns) ) ,
api. resp. delete( "/user/{id}" , delete_route) ,
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
基本封装
import env
import mcrud
import api
import amcrud
env. load( ".env" )
db = mcrud. new_env( )
table = "user"
columns = [ "name" , "age" ]
app = api. Api(
routes= [
api. resp. post( "/user" , amcrud. add( db, table, columns) ) ,
api. resp. get( "/user" , amcrud. get_all( db, table, columns) ) ,
api. resp. get( "/user/{id}" , amcrud. get( db, table, columns) ) ,
api. resp. put( "/user/{id}" , amcrud. update( db, table, columns) ) ,
api. resp. delete( "/user/{id}" , amcrud. delete( db, table) ) ,
] ,
middleware= [ api. middleware. cors( ) ] ,
)
if __name__ == '__main__' :
app. run( )
测试
zhttp DELETE 127.0 .0.1:8888/user/1793956346500816896
zhttp 127.0 .0.1:8888/user