Programing

SqlAlchemy 결과를 JSON으로 직렬화하는 방법은 무엇입니까?

lottogame 2020. 6. 4. 07:50
반응형

SqlAlchemy 결과를 JSON으로 직렬화하는 방법은 무엇입니까?


Django는 DB에서 JSON 형식으로 반환되는 ORM 모델을 자동으로 직렬화합니다.

SQLAlchemy 쿼리 결과를 JSON 형식으로 직렬화하는 방법은 무엇입니까?

시도 jsonpickle.encode했지만 쿼리 객체 자체를 인코딩합니다. 시도 json.dumps(items)했지만 반환

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

SQLAlchemy ORM 객체를 JSON / XML로 직렬화하는 것이 정말 어렵습니까? 기본 시리얼 라이저가 없습니까? 오늘날 ORM 쿼리 결과를 직렬화하는 것은 매우 일반적인 작업입니다.

내가 필요한 것은 SQLAlchemy 쿼리 결과의 JSON 또는 XML 데이터 표현을 반환하는 것입니다.

JSON / XML 형식의 SQLAlchemy 개체 쿼리 결과는 javascript datagird (JQGrid http://www.trirand.com/blog/ )에 사용되어야합니다.


평평한 구현

다음과 같은 것을 사용할 수 있습니다.

from sqlalchemy.ext.declarative import DeclarativeMeta

class AlchemyEncoder(json.JSONEncoder):

    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                try:
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)

다음을 사용하여 JSON으로 변환하십시오.

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

인코딩 할 수없는 필드는 무시합니다 ( '없음'으로 설정).

자동 확장 관계는 아닙니다 (자기 참조로 이어질 수 있고 영원히 반복 될 수 있기 때문에).

재귀적인 비 원형 구현

그러나 영원히 반복하려면 다음을 사용할 수 있습니다.

from sqlalchemy.ext.declarative import DeclarativeMeta

def new_alchemy_encoder():
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

그런 다음 다음을 사용하여 객체를 인코딩하십시오.

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

이것은 모든 어린이와 모든 어린이와 모든 어린이를 인코딩합니다 ... 기본적으로 전체 데이터베이스를 잠재적으로 인코딩합니다. 이전에 인코딩 된 항목에 도달하면 '없음'으로 인코딩합니다.

재귀적이고 순환 가능하며 선택적 구현

또 다른 대안은 아마도 확장하려는 필드를 지정할 수 있다는 것입니다.

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []

    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)

                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)

                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue

                    fields[field] = val
                # a json-encodable dict
                return fields

            return json.JSONEncoder.default(self, obj)

    return AlchemyEncoder

이제 다음과 같이 호출 할 수 있습니다.

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

예를 들어 'parents'라는 SQLAlchemy 필드 만 확장합니다.


객체를 사전으로 출력 할 수 있습니다.

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

그런 다음 User.as_dict()객체를 직렬화 하는 사용 합니다.

sqlalchemy 행 객체를 python dict변환 에서 설명했듯이


다음과 같이 RowProxy를 dict로 변환 할 수 있습니다.

 d = dict(row.items())

그런 다음 JSON으로 직렬화하십시오 ( datetime과 같은 것들에 대해 인코더를 지정해야 합니다) 하나의 레코드 만 원하면 관련 레코드의 전체 계층 구조가 아니라 어렵지 않습니다.

json.dumps([(dict(row.items())) for row in rs])

marshmallow 사용하는 것이 좋습니다 . 관계 및 중첩 된 객체를 지원하여 모델 인스턴스를 나타내는 직렬 변환기를 만들 수 있습니다.

다음은 문서에서 잘린 예입니다. ORM 모델을 보자 Author.

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first = db.Column(db.String(80))
    last = db.Column(db.String(80))

해당 클래스의 마시맬로 스키마는 다음과 같이 구성됩니다.

class AuthorSchema(Schema):
    id = fields.Int(dump_only=True)
    first = fields.Str()
    last = fields.Str()
    formatted_name = fields.Method("format_name", dump_only=True)

    def format_name(self, author):
        return "{}, {}".format(author.last, author.first)

... 다음과 같이 사용되었습니다.

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

... 다음과 같은 출력을 생성합니다.

{
        "first": "Tim",
        "formatted_name": "Peters, Tim",
        "id": 1,
        "last": "Peters"
}

전체 Flask-SQLAlchemy Example을 살펴보십시오 .

marshmallow-sqlalchemy특별히 호출 된 라이브러리는 SQLAlchemy와 마시맬로를 통합합니다. 해당 라이브러리에서 Author위에서 설명한 모델 의 스키마는 다음과 같습니다.

class AuthorSchema(ModelSchema):
    class Meta:
        model = Author

통합을 통해 필드 유형을 SQLAlchemy Column유형 에서 유추 할 수 있습니다.

마시멜로 -sqlalchemy.


Flask-JsonTools 패키지에는 모델에 대한 JsonSerializableBase Base 클래스 가 구현 되어 있습니다.

용법:

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase

Base = declarative_base(cls=(JsonSerializableBase,))

class User(Base):
    #...

이제 User모델은 마술처럼 직렬화 가능합니다.

프레임 워크가 Flask가 아닌 경우 코드를 잡을 수 있습니다.


보안상의 이유로 모든 모델 필드를 반환해서는 안됩니다. 선택적으로 선택하는 것을 선호합니다.

이제 인코딩 플라스크의 JSON은 UUID, 날짜 및 관계 (및 추가 지원 queryquery_class에 대한 flask_sqlalchemy의 db.Model클래스). 인코더를 다음과 같이 업데이트했습니다.

app / json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json


    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                    try:
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)

app/__init__.py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

이를 통해 __json__인코딩하려는 필드 목록을 반환 하는 속성을 선택적으로 추가 할 수 있습니다 .

app/models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    def __init__(self, song):
        self.song = song
        self.src = song.full_path

    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

내 뷰에 @jsonapi를 추가하고 결과 목록을 반환하면 출력은 다음과 같습니다.

[

{

    "created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
    "song": 

        {
            "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
            "id": 2,
            "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
        },
    "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
    "type": "audio/mpeg"
}

]

SqlAlchemy의 내부 검사를 다음과 같이 사용할 수 있습니다.

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

sqlalchemy 행 객체를 python dict로 변환 하십시오.


Python 3.7 이상 및 Flask 1.1 이상은 내장 데이터 클래스 패키지를 사용할 수 있습니다

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app)


@dataclass
class User(db.Model):
  id: int
  email: str

  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)


@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  


if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

/users/경로는 현재 사용자의 목록을 반환합니다.

[
  {"email": "user1@gmail.com", "id": 1},
  {"email": "user2@gmail.com", "id": 2}
]

관련 모델 자동 직렬화

@dataclass
class Account(db.Model):
  id: int
  users: User

  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

응답은 jsonify(account)이것입니다.

{  
   "id":1,
   "users":[  
      {  
         "email":"user1@gmail.com",
         "id":1
      },
      {  
         "email":"user2@gmail.com",
         "id":2
      }
   ]
}

기본 JSON 인코더 덮어 쓰기

from flask.json import JSONEncoder


class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"

  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    else:
      return super().default(o)

app.json_encoder = CustomJSONEncoder      

더 자세한 설명. 모델에서 다음을 추가하십시오.

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()파이썬 2 사용을 사용하는 경우, 그래서 파이썬 3입니다 unicode(). 날짜를 역 직렬화하는 데 도움이됩니다. 처리하지 않으면 제거 할 수 있습니다.

이제 이런 식으로 데이터베이스를 쿼리 할 수 ​​있습니다

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

First()이상한 오류를 피하기 위해 필요합니다. as_dict()결과를 역 직렬화합니다. 직렬화 해제 후 json으로 전환 할 준비가되었습니다.

jsonify(some_result)

그렇게 단호하지 않습니다. 이를 위해 몇 가지 코드를 작성했습니다. 나는 아직도 그것을 연구하고 있으며 MochiKit 프레임 워크를 사용합니다. 기본적으로 프록시와 등록 된 JSON 변환기를 사용하여 Python과 Javascript 간의 복합 객체를 변환합니다.

데이터베이스 오브젝트에 대한 브라우저 측이다 db.js 그것의 기본 파이썬 프록시 소스 필요 proxy.js을 .

파이썬 측에는 기본 프록시 모듈이 있습니다. 그런 다음 webserver.py 의 SqlAlchemy 객체 인코더입니다 . 또한 models.py 파일 에있는 메타 데이터 추출기에 의존 합니다.


사용자 지정 직렬화 및 역 직렬화

"from_json" (클래스 메소드)은 json 데이터를 기반으로 Model 오브젝트를 빌드합니다.

"deserialize" 는 인스턴스에서만 호출 할 수 있으며 json의 모든 데이터를 Model 인스턴스로 병합합니다.

"직렬화" -재귀 직렬화

쓰기 전용 속성을 정의하려면 __write_only__ 속성이 필요합니다 (예 : "password_hash").

class Serializable(object):
    __exclude__ = ('id',)
    __include__ = ()
    __write_only__ = ()

    @classmethod
    def from_json(cls, json, selfObj=None):
        if selfObj is None:
            self = cls()
        else:
            self = selfObj
        exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
        include = cls.__include__ or ()
        if json:
            for prop, value in json.iteritems():
                # ignore all non user data, e.g. only
                if (not (prop in exclude) | (prop in include)) and isinstance(
                        getattr(cls, prop, None), QueryableAttribute):
                    setattr(self, prop, value)
        return self

    def deserialize(self, json):
        if not json:
            return None
        return self.__class__.from_json(json, selfObj=self)

    @classmethod
    def serialize_list(cls, object_list=[]):
        output = []
        for li in object_list:
            if isinstance(li, Serializable):
                output.append(li.serialize())
            else:
                output.append(li)
        return output

    def serialize(self, **kwargs):

        # init write only props
        if len(getattr(self.__class__, '__write_only__', ())) == 0:
            self.__class__.__write_only__ = ()
        dictionary = {}
        expand = kwargs.get('expand', ()) or ()
        prop = 'props'
        if expand:
            # expand all the fields
            for key in expand:
                getattr(self, key)
        iterable = self.__dict__.items()
        is_custom_property_set = False
        # include only properties passed as parameter
        if (prop in kwargs) and (kwargs.get(prop, None) is not None):
            is_custom_property_set = True
            iterable = kwargs.get(prop, None)
        # loop trough all accessible properties
        for key in iterable:
            accessor = key
            if isinstance(key, tuple):
                accessor = key[0]
            if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                # force select from db to be able get relationships
                if is_custom_property_set:
                    getattr(self, accessor, None)
                if isinstance(self.__dict__.get(accessor), list):
                    dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                # check if those properties are read only
                elif isinstance(self.__dict__.get(accessor), Serializable):
                    dictionary[accessor] = self.__dict__.get(accessor).serialize()
                else:
                    dictionary[accessor] = self.__dict__.get(accessor)
        return dictionary

다음은 원하는만큼 출력에 포함 할 관계를 선택할 수있는 솔루션입니다. 참고 : 이것은 dict / str을 목록이 아닌 인수로 사용하여 완전히 다시 작성하는 것입니다. 몇 가지 물건을 수정합니다 ..

def deep_dict(self, relations={}):
    """Output a dict of an SA object recursing as deep as you want.

    Takes one argument, relations which is a dictionary of relations we'd
    like to pull out. The relations dict items can be a single relation
    name or deeper relation names connected by sub dicts

    Example:
        Say we have a Person object with a family relationship
            person.deep_dict(relations={'family':None})
        Say the family object has homes as a relation then we can do
            person.deep_dict(relations={'family':{'homes':None}})
            OR
            person.deep_dict(relations={'family':'homes'})
        Say homes has a relation like rooms you can do
            person.deep_dict(relations={'family':{'homes':'rooms'}})
            and so on...
    """
    mydict =  dict((c, str(a)) for c, a in
                    self.__dict__.items() if c != '_sa_instance_state')
    if not relations:
        # just return ourselves
        return mydict

    # otherwise we need to go deeper
    if not isinstance(relations, dict) and not isinstance(relations, str):
        raise Exception("relations should be a dict, it is of type {}".format(type(relations)))

    # got here so check and handle if we were passed a dict
    if isinstance(relations, dict):
        # we were passed deeper info
        for left, right in relations.items():
            myrel = getattr(self, left)
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=right)
    # if we get here check and handle if we were passed a string
    elif isinstance(relations, str):
        # passed a single item
        myrel = getattr(self, relations)
        left = relations
        if isinstance(myrel, list):
            mydict[left] = [rel.deep_dict(relations=None)
                                 for rel in myrel]
        else:
            mydict[left] = myrel.deep_dict(relations=None)

    return mydict

예를 들어 person / family / homes / rooms를 사용하는 예제는 json으로 바꾸는 것입니다.

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))

원래의 질문은 잠시 되돌아가는 반면, 여기 (그리고 내 경험)의 수는 다른 상충 관계와 다양한 복잡성에 대한 다양한 접근법이있는 사소한 질문이라는 것을 암시합니다.

그렇기 때문에 구성 가능한 직렬화 / 역 직렬화 지원으로 SQLAlchemy의 선언적 ORM을 확장 하는 SQLAthanor 라이브러리를 작성했습니다.

라이브러리는 다음을 지원합니다.

  • Python 2.7, 3.4, 3.5 및 3.6.
  • SQLAlchemy 버전 0.9 이상
  • JSON, CSV, YAML 및 Python과의 직렬화 / 역 직렬화 dict
  • 열 / 속성, 관계, 하이브리드 속성 및 연결 프록시의 직렬화 / 역 직렬화
  • 특정 형식 및 열 / 관계 / 속성에 대한 직렬화 활성화 및 비활성화 (예 : 인바운드 password은 지원 하지만 아웃 바운드 값은 포함하지 않음 )
  • 사전 직렬화 및 역 직렬화 후 값 처리 (유효성 검증 또는 유형 강제)
  • 파이썬과 SQLAlchemy의 접근 방식과 완벽하게 일치하는 매우 간단한 구문

https://sqlathanor.readthedocs.io/en/latest 에서 (I hope!) 포괄적 인 문서를 확인할 수 있습니다.

도움이 되었기를 바랍니다!


def alc2json(row):
    return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

나는 이것으로 작은 코드 골프를 할 것이라고 생각했다.

참고 : 비즈니스 요구 사항에 따라 별도로 설계된 스키마가 있으므로 automap_base를 사용 하고 있습니다. 방금 오늘 SQLAlchemy를 사용하기 시작했지만 설명서에는 automap_base가 declarative_base에 대한 확장이라고 명시되어 있는데 이는 SQLAlchemy ORM의 일반적인 패러다임 인 것처럼 보이므로 이것이 작동해야한다고 생각합니다.

Tjorriemorrie 의 솔루션에 따라 다음 외래 키를 사용하면 환상적이지 않지만 단순히 열을 값에 일치시키고 열 값을 str () -ing 하여 Python 유형을 처리합니다. 우리의 값은 Python datetime.time 및 decimal.Decimal 클래스 유형 결과로 구성되어 작업을 완료합니다.

이것이 모든 행인들에게 도움이되기를 바랍니다!


SQLAlchemy 에서 내장 직렬 변환기사용하십시오 .

from sqlalchemy.ext.serializer import loads, dumps
obj = MyAlchemyObject()
# serialize object
serialized_obj = dumps(obj)

# deserialize object
obj = loads(serialized_obj)

세션간에 객체를 전송하는 경우을 사용하여 현재 세션에서 객체를 분리해야합니다 session.expunge(obj). 다시 첨부하려면을 클릭하십시오 session.add(obj).


다음 코드는 sqlalchemy 결과를 json으로 직렬화합니다.

import json
from collections import OrderedDict


def asdict(self):
    result = OrderedDict()
    for key in self.__mapper__.c.keys():
        if getattr(self, key) is not None:
            result[key] = str(getattr(self, key))
        else:
            result[key] = getattr(self, key)
    return result


def to_array(all_vendors):
    v = [ ven.asdict() for ven in all_vendors ]
    return json.dumps(v) 

재미 있고

def all_products():
    all_products = Products.query.all()
    return to_array(all_products)

AlchemyEncoder는 훌륭하지만 때로는 10 진수 값으로 실패합니다. 다음은 소수 문제를 해결하는 개선 된 인코더입니다.

class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects 
def default(self, obj):
    if isinstance(obj.__class__, DeclarativeMeta):
        model_fields = {}
        for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
            data = obj.__getattribute__(field)
            print data
            try:
                json.dumps(data)  # this will fail on non-encodable values, like other classes
                model_fields[field] = data
            except TypeError:
                model_fields[field] = None
        return model_fields
    if isinstance(obj, Decimal):
        return float(obj)
    return json.JSONEncoder.default(self, obj)

나는 이것이 꽤 오래된 게시물이라는 것을 알고있다. @SashaB가 제공 한 솔루션을 가져 와서 필요에 따라 수정했습니다.

나는 그것에 다음과 같은 것을 추가했다 :

  1. 필드 무시 목록 : 직렬화 중에 무시할 필드 목록
  2. 필드 대체 목록 : 직렬화 중에 값으로 대체 할 필드 이름이 포함 된 사전입니다.
  3. 제거 된 메소드 및 BaseQuery가 직렬화되는 중

내 코드는 다음과 같습니다.

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
   """
   Serialize SQLAlchemy result into JSon
   :param revisit_self: True / False
   :param fields_to_expand: Fields which are to be expanded for including their children and all
   :param fields_to_ignore: Fields to be ignored while encoding
   :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
   :return: Json serialized SQLAlchemy object
   """
   _visited_objs = []
   class AlchemyEncoder(json.JSONEncoder):
      def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # don't re-visit self
            if revisit_self:
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)

            # go through each field in this SQLalchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                val = obj.__getattribute__(field)
                # is this field method defination, or an SQLalchemy object
                if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                    field_name = fields_to_replace[field] if field in fields_to_replace else field
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or \
                            (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field_name] = None
                            continue

                    fields[field_name] = val
            # a json-encodable dict
            return fields

        return json.JSONEncoder.default(self, obj)
   return AlchemyEncoder

그것이 누군가를 돕기를 바랍니다!


플라스크에서, 작품과 핸들이 유형의 필드를 변형 필드를 datatime
'time': datetime.datetime(2018, 3, 22, 15, 40)
"time": "2018-03-22 15:40:00":

obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

# This to get the JSON body
return json.dumps(obj)

# Or this to get a response object
return jsonify(obj)

utf-8을 사용한 내장 시리얼 라이저 초크는 일부 입력에 대해 유효하지 않은 시작 바이트를 디코딩 할 수 없습니다. 대신, 나는 갔다 :

def row_to_dict(row):
    temp = row.__dict__
    temp.pop('_sa_instance_state', None)
    return temp


def rows_to_list(rows):
    ret_rows = []
    for row in rows:
        ret_rows.append(row_to_dict(row))
    return ret_rows


@website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
def some_api():
    '''
    /some_endpoint
    '''
    rows = rows_to_list(SomeModel.query.all())
    response = app.response_class(
        response=jsonplus.dumps(rows),
        status=200,
        mimetype='application/json'
    )
    return response

내 사전은 (너무 많은?) 사전을 활용합니다.

def serialize(_query):
    #d = dictionary written to per row
    #D = dictionary d is written to each time, then reset
    #Master = dictionary of dictionaries; the id Key (int, unique from database) 
    from D is used as the Key for the dictionary D entry in Master
    Master = {}
    D = {}
    x = 0
    for u in _query:
        d = u.__dict__
        D = {}
        for n in d.keys():
           if n != '_sa_instance_state':
                    D[n] = d[n]
        x = d['id']
        Master[x] = D
    return Master

플라스크 (jsonify 포함) 및 flask_sqlalchemy를 사용하여 출력을 JSON으로 인쇄합니다.

jsonify (serialize ())로 함수를 호출하십시오.

지금까지 시도한 모든 SQLAlchemy 쿼리와 함께 작동합니다 (SQLite3 실행).

참고 URL : https://stackoverflow.com/questions/5022066/how-to-serialize-sqlalchemy-result-to-json

반응형