I’m noticing that after bulk loading CSVs for a while, I see the following status:
+--------------------+-------------------------+-------------------------+
| Service Name | Service Status | Process State |
+--------------------+-------------------------+-------------------------+
| ADMIN | Online | Running |
| CTRL | Online | Running |
| DICT | Online | Running |
| ETCD | Online | Running |
| GPE | Online | Running |
| GSE | Online | Running |
| GSQL | Online | Running |
| GUI | Online | Running |
| IFM | Online | Running |
| KAFKA | Online | Running |
| KAFKACONN | Online | Running |
| KAFKASTRM-LL | Down | Stopped |
| NGINX | Online | Running |
| RESTPP | Online | Running |
| TS3 | Online | Running |
| TS3SERV | Online | Running |
| ZK | Online | Running |
+--------------------+-------------------------+-------------------------+
In the logs for this service I see the following error:
[2020-12-10 15:14:57,777] INFO stream-thread [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-12-10 15:14:57,777] ERROR Log aggregation stream stopped due to unexpected exception (com.tigergraph.kafka.stream.loadinglog.LogAggregation:168)
org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (timestamp 1607611403192) to topic log-aggregation-loading-progress-changelog due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 1056245 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:959)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:181)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
at org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:285)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1056245 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
[2020-12-10 15:14:57,779] INFO stream-client [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65] State transition from ERROR to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:287)
[2020-12-10 15:14:57,782] INFO stream-thread [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65-StreamThread-1] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-12-10 15:14:57,785] INFO stream-client [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65] State transition from PENDING_SHUTDOWN to NOT_RUNNING (org.apache.kafka.streams.KafkaStreams:287)
[2020-12-10 15:14:57,785] INFO stream-client [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65] Streams client stopped completely (org.apache.kafka.streams.KafkaStreams:989)
[2020-12-10 15:14:57,793] INFO Stopped ServerConnector@183ec003{HTTP/1.1,[http/1.1]}{0.0.0.0:30004} (org.eclipse.jetty.server.AbstractConnector:380)
[2020-12-10 15:14:57,794] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:158)
[2020-12-10 15:14:57,804] INFO Stopped o.e.j.s.ServletContextHandler@417ad4f3{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-12-10 15:14:57,805] INFO Log aggregation stream stopped (com.tigergraph.kafka.stream.loadinglog.LogAggregation:179)
[2020-12-10 15:14:57,806] INFO REST server stopped (com.tigergraph.kafka.stream.loadinglog.LogAggregation:180)
(END)
Is it a problem that this service is down? What does it do? Loading seems to continue. What can I do to resolve this?